From bfc75e0db85e56e0374ec6cf5c45c0da7650512b Mon Sep 17 00:00:00 2001 From: Terrtia Date: Mon, 3 Jun 2019 17:29:20 +0200 Subject: [PATCH] fix: [server] fix extended-types connection (allow concurrent 2/254) + fix extended types metadata + save connected types/extended in DB --- server/documentation/Database.md | 15 +++++ server/server.py | 102 +++++++++++++++++++++-------- server/web/Flask_server.py | 12 +++- server/workers/workers_2/worker.py | 17 +++-- 4 files changed, 113 insertions(+), 33 deletions(-) diff --git a/server/documentation/Database.md b/server/documentation/Database.md index 29888e4..199adc5 100644 --- a/server/documentation/Database.md +++ b/server/documentation/Database.md @@ -24,7 +24,21 @@ sensor registrations, management of decoding protocols and dispatching to adequa | server:accepted_type | **accepted type** | | server:accepted_extended_type | **accepted extended type** | +###### Connection Manager +| Set Key | Value | +| --- | --- | +| active_connection | **uuid** | +| | | +| active_connection:**type** | **uuid** | +| active_connection_extended_type:**uuid** | **extended type** | +| | | +| active_uuid_type2:**uuid** | **session uuid** | +| | | +| map:active_connection-uuid-session_uuid:**uuid** | **session uuid** | +| Set Key | Field | Value | +| --- | --- | --- | +| map:session-uuid_active_extended_type | **session_uuid** | **extended_type** | ### Stats | Zset Key | Field | Value | @@ -66,6 +80,7 @@ sensor registrations, management of decoding protocols and dispatching to adequa | Set Key | Value | | --- | --- | | all_types_by_uuid:**uuid** | **type** | +| all_extended_types_by_uuid:**uuid** | **type** | ### analyzers ###### metadata diff --git a/server/server.py b/server/server.py index 4217724..fcac786 100755 --- a/server/server.py +++ b/server/server.py @@ -136,17 +136,30 @@ class D4_Server(Protocol, TimeoutMixin): def connectionLost(self, reason): redis_server_stream.sadd('ended_session', self.session_uuid) self.setTimeout(None) + if not self.duplicate: - if self.type == 254: - type = 2 + if self.type == 254 or self.type == 2: + redis_server_stream.srem('active_uuid_type{}:{}'.format(self.type, self.uuid), self.session_uuid) + if not redis_server_stream.exists('active_uuid_type{}:{}'.format(self.type, self.uuid)): + redis_server_stream.srem('active_connection:{}'.format(self.type), self.uuid) + redis_server_stream.srem('active_connection_by_uuid:{}'.format(self.uuid), self.type) + # clean extended type + current_extended_type = redis_server_stream.hget('map:session-uuid_active_extended_type', self.session_uuid) + if current_extended_type: + redis_server_stream.hdel('map:session-uuid_active_extended_type', self.session_uuid) + redis_server_stream.srem('active_connection_extended_type:{}'.format(self.uuid), current_extended_type) + else: - type = self.type - redis_server_stream.srem('active_connection:{}'.format(type), '{}:{}'.format(self.ip, self.uuid)) - redis_server_stream.srem('active_connection', '{}'.format(self.uuid)) + redis_server_stream.srem('active_connection:{}'.format(self.type), self.uuid) + redis_server_stream.srem('active_connection_by_uuid:{}'.format(self.uuid), self.type) + if self.uuid: redis_server_stream.srem('map:active_connection-uuid-session_uuid:{}'.format(self.uuid), self.session_uuid) - logger.debug('Connection closed: session_uuid={}'.format(self.session_uuid)) - dict_all_connection.pop(self.session_uuid) + if not redis_server_stream.exists('active_connection_by_uuid:{}'.format(self.uuid)): + redis_server_stream.srem('active_connection', self.uuid) + + logger.debug('Connection closed: session_uuid={}'.format(self.session_uuid)) + dict_all_connection.pop(self.session_uuid) def unpack_header(self, data): data_header = {} @@ -237,29 +250,43 @@ class D4_Server(Protocol, TimeoutMixin): # auto kill connection # TODO: map type if self.first_connection: self.first_connection = False - if redis_server_stream.sismember('active_connection:{}'.format(data_header['type']), '{}:{}'.format(ip, data_header['uuid_header'])): + if data_header['type'] == 2: + redis_server_stream.sadd('active_uuid_type2:{}'.format(data_header['uuid_header']), self.session_uuid) + + # type 254, check if previous type 2 saved + elif data_header['type'] == 254: + logger.warning('a type 2 packet must be sent, ip={} uuid={} type={} session_uuid={}'.format(ip, data_header['uuid_header'], data_header['type'], self.session_uuid)) + redis_server_metadata.hset('metadata_uuid:{}'.format(data_header['uuid_header']), 'Error', 'Error: a type 2 packet must be sent, type={}'.format(data_header['type'])) + self.duplicate = True + self.transport.abortConnection() + return 1 + + # accept only one type/by uuid (except for type 2/254) + elif redis_server_stream.sismember('active_connection:{}'.format(data_header['type']), '{}'.format(data_header['uuid_header'])): # same IP-type for an UUID logger.warning('is using the same UUID for one type, ip={} uuid={} type={} session_uuid={}'.format(ip, data_header['uuid_header'], data_header['type'], self.session_uuid)) redis_server_metadata.hset('metadata_uuid:{}'.format(data_header['uuid_header']), 'Error', 'Error: This UUID is using the same UUID for one type={}'.format(data_header['type'])) self.duplicate = True self.transport.abortConnection() return 1 - else: - #self.version = None - # type 254, check if previous type 2 saved - if data_header['type'] == 254: - logger.warning('a type 2 packet must be sent, ip={} uuid={} type={} session_uuid={}'.format(ip, data_header['uuid_header'], data_header['type'], self.session_uuid)) - redis_server_metadata.hset('metadata_uuid:{}'.format(data_header['uuid_header']), 'Error', 'Error: a type 2 packet must be sent, type={}'.format(data_header['type'])) - self.duplicate = True - self.transport.abortConnection() - return 1 - self.type = data_header['type'] - self.uuid = data_header['uuid_header'] - #active Connection - redis_server_stream.sadd('active_connection:{}'.format(self.type), '{}:{}'.format(ip, self.uuid)) - redis_server_stream.sadd('active_connection', '{}'.format(self.uuid)) - # map session_uuid/uuid - redis_server_stream.sadd('map:active_connection-uuid-session_uuid:{}'.format(self.uuid), self.session_uuid) + + self.type = data_header['type'] + self.uuid = data_header['uuid_header'] + + # worker entry point: map type:session_uuid + redis_server_stream.sadd('session_uuid:{}'.format(data_header['type']), self.session_uuid.encode()) + + ## save active connection ## + #active Connection + redis_server_stream.sadd('active_connection:{}'.format(self.type), self.uuid) + redis_server_stream.sadd('active_connection_by_uuid:{}'.format(self.uuid), self.type) + redis_server_stream.sadd('active_connection', self.uuid) + # map session_uuid/uuid + redis_server_stream.sadd('map:active_connection-uuid-session_uuid:{}'.format(self.uuid), self.session_uuid) + + # map all type by uuid ## TODO: # FIXME: put me in workers ?????? + redis_server_metadata.sadd('all_types_by_uuid:{}'.format(data_header['uuid_header']), data_header['type']) + ## ## # check if type change if self.data_saved: @@ -269,6 +296,30 @@ class D4_Server(Protocol, TimeoutMixin): if self.type == 2 and data_header['type'] == 254: self.update_stream_type = True self.type = data_header['type'] + #redis_server_stream.hdel('map-type:session_uuid-uuid:2', self.session_uuid) # # TODO: to remove / refractor + redis_server_stream.srem('active_uuid_type2:{}'.format(self.uuid), self.session_uuid) + + # remove type 2 connection + if not redis_server_stream.exists('active_uuid_type2:{}'.format(self.uuid)): + redis_server_stream.srem('active_connection:2', self.uuid) + redis_server_stream.srem('active_connection_by_uuid:{}'.format(self.uuid), 2) + + ## save active connection ## + #active Connection + redis_server_stream.sadd('active_connection:{}'.format(self.type), self.uuid) + redis_server_stream.sadd('active_connection_by_uuid:{}'.format(self.uuid), self.type) + redis_server_stream.sadd('active_connection', self.uuid) + + redis_server_stream.sadd('active_uuid_type254:{}'.format(self.uuid), self.session_uuid) + + # map all type by uuid ## TODO: # FIXME: put me in workers ?????? + redis_server_metadata.sadd('all_types_by_uuid:{}'.format(data_header['uuid_header']), data_header['type']) + ## ## + + + #redis_server_stream.hset('map-type:session_uuid-uuid:{}'.format(data_header['type']), self.session_uuid, data_header['uuid_header']) + + # Type Error else: logger.warning('Unexpected type change, type={} new type={}, ip={} uuid={} session_uuid={}'.format(ip, data_header['uuid_header'], data_header['type'], self.session_uuid)) @@ -408,9 +459,6 @@ class D4_Server(Protocol, TimeoutMixin): self.data_saved = True if self.update_stream_type: - redis_server_stream.sadd('session_uuid:{}'.format(data_header['type']), self.session_uuid.encode()) - redis_server_stream.hset('map-type:session_uuid-uuid:{}'.format(data_header['type']), self.session_uuid, data_header['uuid_header']) - redis_server_metadata.sadd('all_types_by_uuid:{}'.format(data_header['uuid_header']), data_header['type']) if not redis_server_metadata.hexists('metadata_type_by_uuid:{}:{}'.format(data_header['uuid_header'], data_header['type']), 'first_seen'): redis_server_metadata.hset('metadata_type_by_uuid:{}:{}'.format(data_header['uuid_header'], data_header['type']), 'first_seen', data_header['timestamp']) diff --git a/server/web/Flask_server.py b/server/web/Flask_server.py index 501f976..6b3f77f 100755 --- a/server/web/Flask_server.py +++ b/server/web/Flask_server.py @@ -261,7 +261,17 @@ def sensors_status(): description = redis_server_metadata.hget('metadata_uuid:{}'.format(result), 'description') if not description: description = '' - l_uuid_types = list(redis_server_metadata.smembers('all_types_by_uuid:{}'.format(result))) + l_uuid_types = redis_server_metadata.smembers('all_types_by_uuid:{}'.format(result)) + for type in l_uuid_types: + if redis_server_stream.sismember('active_connection:{}'.format(type), result): + print('connected: {}'.format(type)) + if '254' in l_uuid_types: + extended_type = redis_server_metadata.smembers('all_extended_types_by_uuid:{}'.format(result)) + for extended in extended_type: + if redis_server_stream.sismember('active_connection_extended_type:{}'.format(result), extended): + print('connected: {}'.format(extended)) + l_uuid_types.update(extended_type) + l_uuid_types = list(l_uuid_types) l_uuid_types.sort() if redis_server_metadata.sismember('blacklist_ip_by_uuid', result): Error = "All IP using this UUID are Blacklisted" diff --git a/server/workers/workers_2/worker.py b/server/workers/workers_2/worker.py index daf2a98..14f2745 100755 --- a/server/workers/workers_2/worker.py +++ b/server/workers/workers_2/worker.py @@ -60,10 +60,16 @@ def clean_db(session_uuid): clean_stream(stream_defined, type_defined, session_uuid) redis_server_stream.srem('ended_session', session_uuid) redis_server_stream.srem('working_session_uuid:{}'.format(type_meta_header), session_uuid) + # clean extended type (used) + redis_server_stream.hdel('map:session-uuid_active_extended_type', session_uuid) + try: + redis_server_stream.srem('active_connection_extended_type:{}'.format(uuid), extended_type) + except Exception as e: + print(e) def clean_stream(stream_name, type, session_uuid): redis_server_stream.srem('session_uuid:{}'.format(type), session_uuid) - redis_server_stream.hdel('map-type:session_uuid-uuid:{}'.format(type), session_uuid) + #redis_server_stream.hdel('map-type:session_uuid-uuid:{}'.format(type), session_uuid) redis_server_stream.delete(stream_name) if __name__ == "__main__": @@ -151,6 +157,10 @@ if __name__ == "__main__": clean_db(session_uuid) sys.exit(1) + # create active_connection for extended type + redis_server_stream.sadd('active_connection_extended_type:{}'.format(uuid), extended_type) + + redis_server_stream.hset('map:session-uuid_active_extended_type', session_uuid, extended_type) #### Handle Specific MetaTypes #### # Use Specific Handler defined @@ -174,11 +184,8 @@ if __name__ == "__main__": buffer = b'' type_handler.test() - # create active_connection for extended type - #redis_server_stream.sadd('active_connection_extended_type:{}', '{}'.format(self.uuid)) - # update uuid: extended type list - #redis_server_metadata.sadd('all_extended_types_by_uuid:{}'.format(uuid), extended_type) + redis_server_metadata.sadd('all_extended_types_by_uuid:{}'.format(uuid), extended_type) # update metadata extended type time_val = int(time.time())