2018-12-05 16:24:10 +01:00
#!/usr/bin/env python3
2018-12-13 10:55:27 +01:00
import os
2018-12-05 16:24:10 +01:00
import sys
2019-01-11 13:54:01 +01:00
import uuid
2018-12-12 15:27:00 +01:00
import hmac
2018-12-13 10:55:27 +01:00
import stat
2018-12-13 16:03:05 +01:00
import redis
2018-12-21 15:27:51 +01:00
import struct
2019-01-02 17:00:43 +01:00
import time
2019-01-07 16:11:04 +01:00
import datetime
2019-01-14 11:17:18 +01:00
import argparse
import logging
import logging . handlers
2019-10-01 11:26:14 +02:00
import configparser
2018-12-05 16:24:10 +01:00
from twisted . internet import ssl , task , protocol , endpoints , defer
from twisted . python import log
from twisted . python . modules import getModule
from twisted . internet . protocol import Protocol
2019-01-03 15:23:06 +01:00
from twisted . protocols . policies import TimeoutMixin
2018-12-05 16:24:10 +01:00
2018-12-21 15:27:51 +01:00
hmac_reset = bytearray ( 32 )
2019-06-11 20:27:39 +02:00
hmac_key = os . getenv ( ' D4_HMAC_KEY ' , b ' private key to change ' )
2018-12-13 10:55:27 +01:00
2019-02-27 15:46:34 +01:00
accepted_type = [ 1 , 2 , 4 , 8 , 254 ]
accepted_extended_type = [ ' ja3-jl ' ]
2019-01-16 10:27:59 +01:00
2019-09-30 17:07:25 +02:00
all_server_modes = ( ' registration ' , ' shared-secret ' )
2019-01-03 15:46:42 +01:00
timeout_time = 30
header_size = 62
2019-01-18 15:38:27 +01:00
data_default_size_limit = 1000000
2019-01-16 10:27:59 +01:00
default_max_entries_by_stream = 10000
2019-06-11 20:27:39 +02:00
host_redis_stream = os . getenv ( ' D4_REDIS_STREAM_HOST ' , " localhost " )
port_redis_stream = int ( os . getenv ( ' D4_REDIS_STREAM_PORT ' , 6379 ) )
2019-01-16 10:27:59 +01:00
2019-06-11 20:27:39 +02:00
host_redis_metadata = os . getenv ( ' D4_REDIS_METADATA_HOST ' , " localhost " )
port_redis_metadata = int ( os . getenv ( ' D4_REDIS_METADATA_PORT ' , 6380 ) )
2019-01-16 10:27:59 +01:00
2019-09-30 17:07:25 +02:00
### REDIS ###
2019-01-16 10:27:59 +01:00
redis_server_stream = redis . StrictRedis (
host = host_redis_stream ,
port = port_redis_stream ,
db = 0 )
2019-01-11 13:54:01 +01:00
2019-01-16 10:27:59 +01:00
redis_server_metadata = redis . StrictRedis (
host = host_redis_metadata ,
port = port_redis_metadata ,
2019-01-08 10:09:57 +01:00
db = 0 )
2018-12-13 16:03:05 +01:00
2019-01-11 13:54:01 +01:00
try :
2019-01-16 10:27:59 +01:00
redis_server_stream . ping ( )
2019-01-11 13:54:01 +01:00
except redis . exceptions . ConnectionError :
2019-01-16 10:27:59 +01:00
print ( ' Error: Redis server {} : {} , ConnectionError ' . format ( host_redis_stream , port_redis_stream ) )
2019-01-11 13:54:01 +01:00
sys . exit ( 1 )
2019-01-16 10:27:59 +01:00
try :
redis_server_metadata . ping ( )
except redis . exceptions . ConnectionError :
print ( ' Error: Redis server {} : {} , ConnectionError ' . format ( host_redis_metadata , port_redis_metadata ) )
sys . exit ( 1 )
2019-09-30 17:07:25 +02:00
### REDIS ###
2019-01-25 17:01:43 +01:00
# set hmac default key
redis_server_metadata . set ( ' server:hmac_default_key ' , hmac_key )
2019-01-16 10:27:59 +01:00
# init redis_server_metadata
for type in accepted_type :
redis_server_metadata . sadd ( ' server:accepted_type ' , type )
2019-02-28 16:35:34 +01:00
for type in accepted_extended_type :
2019-02-27 15:46:34 +01:00
redis_server_metadata . sadd ( ' server:accepted_extended_type ' , type )
2019-01-16 10:27:59 +01:00
2019-04-02 16:18:37 +02:00
dict_all_connection = { }
2019-09-30 17:07:25 +02:00
### FUNCTIONS ###
# kick sensors
def kick_sensors ( ) :
for client_uuid in redis_server_stream . smembers ( ' server:sensor_to_kick ' ) :
client_uuid = client_uuid . decode ( )
for session_uuid in redis_server_stream . smembers ( ' map:active_connection-uuid-session_uuid: {} ' . format ( client_uuid ) ) :
session_uuid = session_uuid . decode ( )
logger . warning ( ' Sensor kicked uuid= {} , session_uuid= {} ' . format ( client_uuid , session_uuid ) )
redis_server_stream . set ( ' temp_blacklist_uuid: {} ' . format ( client_uuid ) , ' some random string ' )
redis_server_stream . expire ( ' temp_blacklist_uuid: {} ' . format ( client_uuid ) , 30 )
dict_all_connection [ session_uuid ] . transport . abortConnection ( )
redis_server_stream . srem ( ' server:sensor_to_kick ' , client_uuid )
# Unpack D4 Header
#def unpack_header(data):
# data_header = {}
# if len(data) >= header_size:
# data_header['version'] = struct.unpack('B', data[0:1])[0]
# data_header['type'] = struct.unpack('B', data[1:2])[0]
# data_header['uuid_header'] = data[2:18].hex()
# data_header['timestamp'] = struct.unpack('Q', data[18:26])[0]
# data_header['hmac_header'] = data[26:58]
# data_header['size'] = struct.unpack('I', data[58:62])[0]
# return data_header
def is_valid_uuid_v4 ( header_uuid ) :
try :
uuid_test = uuid . UUID ( hex = header_uuid , version = 4 )
return uuid_test . hex == header_uuid
except :
logger . info ( ' Not UUID v4: uuid= {} , session_uuid= {} ' . format ( header_uuid , self . session_uuid ) )
return False
# # TODO: check timestamp
def is_valid_header ( uuid_to_check , type ) :
if is_valid_uuid_v4 ( uuid_to_check ) :
if redis_server_metadata . sismember ( ' server:accepted_type ' , type ) :
return True
else :
logger . warning ( ' Invalid type, the server don \' t accept this type: {} , uuid= {} , session_uuid= {} ' . format ( type , uuid_to_check , self . session_uuid ) )
return False
else :
logger . info ( ' Invalid Header, uuid= {} , session_uuid= {} ' . format ( uuid_to_check , self . session_uuid ) )
return False
def extract_ip ( ip_string ) :
#remove interface
ip_string = ip_string . split ( ' % ' ) [ 0 ]
# IPv4
#extract ipv4
if ' . ' in ip_string :
return ip_string . split ( ' : ' ) [ - 1 ]
# IPv6
else :
return ip_string
def server_mode_registration ( header_uuid ) :
# only accept registred uuid
if server_mode == ' registration ' :
if not redis_server_metadata . sismember ( ' registered_uuid ' , header_uuid ) :
error_msg = ' Not registred UUID= {} , connection closed ' . format ( header_uuid )
print ( error_msg )
logger . warning ( error_msg )
#redis_server_metadata.hset('metadata_uuid:{}'.format(data_header['uuid_header']), 'Error', 'Error: This UUID is temporarily blacklisted')
return False
else :
return True
else :
return True
def is_client_ip_blacklisted ( ) :
pass
def is_uuid_blacklisted ( uuid ) :
return redis_server_metadata . sismember ( ' blacklist_uuid ' , data_header [ ' uuid_header ' ] )
# return True if not blocked
# False if blacklisted
def check_blacklist ( ) :
pass
# Kill Connection + create log
#def manual_abort_connection(self, message, log_level='WARNING'):
# logger.log(message)
# self.transport.abortConnection()
# return 1
### ###
2019-02-14 11:17:50 +01:00
class D4_Server ( Protocol , TimeoutMixin ) :
2018-12-05 16:24:10 +01:00
2019-01-02 17:00:43 +01:00
def __init__ ( self ) :
2019-01-03 09:41:15 +01:00
self . buffer = b ' '
2019-01-03 15:23:06 +01:00
self . setTimeout ( timeout_time )
2019-01-11 13:54:01 +01:00
self . session_uuid = str ( uuid . uuid4 ( ) )
2019-01-14 11:17:18 +01:00
self . data_saved = False
2019-02-27 15:46:34 +01:00
self . update_stream_type = True
2019-01-28 11:35:28 +01:00
self . first_connection = True
2019-04-24 10:43:47 +02:00
self . duplicate = False
2019-02-14 11:17:50 +01:00
self . ip = None
self . source_port = None
2019-01-16 10:27:59 +01:00
self . stream_max_size = None
2019-01-25 17:01:43 +01:00
self . hmac_key = None
2019-01-28 11:35:28 +01:00
#self.version = None
self . type = None
self . uuid = None
2019-01-14 12:27:30 +01:00
logger . debug ( ' New session: session_uuid= {} ' . format ( self . session_uuid ) )
2019-04-02 16:18:37 +02:00
dict_all_connection [ self . session_uuid ] = self
2018-12-05 16:24:10 +01:00
def dataReceived ( self , data ) :
2019-04-02 16:18:37 +02:00
# check and kick sensor by uuid
2019-09-30 17:07:25 +02:00
kick_sensors ( )
2019-04-02 16:18:37 +02:00
2019-01-03 15:23:06 +01:00
self . resetTimeout ( )
2019-02-14 11:17:50 +01:00
if self . first_connection or self . ip is None :
client_info = self . transport . client
2019-09-30 17:07:25 +02:00
self . ip = extract_ip ( client_info [ 0 ] )
2019-02-14 11:17:50 +01:00
self . source_port = client_info [ 1 ]
logger . debug ( ' New connection, ip= {} , port= {} session_uuid= {} ' . format ( self . ip , self . source_port , self . session_uuid ) )
2019-01-08 16:29:44 +01:00
# check blacklisted_ip
2019-02-14 11:17:50 +01:00
if redis_server_metadata . sismember ( ' blacklist_ip ' , self . ip ) :
2019-01-08 16:29:44 +01:00
self . transport . abortConnection ( )
2019-02-14 11:17:50 +01:00
logger . warning ( ' Blacklisted IP= {} , connection closed ' . format ( self . ip ) )
2019-04-02 16:18:37 +02:00
else :
# process data
self . process_header ( data , self . ip , self . source_port )
2018-12-05 17:05:46 +01:00
2019-01-03 15:23:06 +01:00
def timeoutConnection ( self ) :
2019-04-03 14:05:16 +02:00
if self . uuid is None :
# # TODO: ban auto
logger . warning ( ' Timeout, no D4 header send, session_uuid= {} , connection closed ' . format ( self . session_uuid ) )
self . transport . abortConnection ( )
else :
self . resetTimeout ( )
self . buffer = b ' '
logger . debug ( ' buffer timeout, session_uuid= {} ' . format ( self . session_uuid ) )
2019-01-02 17:00:43 +01:00
2019-04-03 11:55:15 +02:00
def connectionMade ( self ) :
self . transport . setTcpKeepAlive ( 1 )
2019-01-11 13:54:01 +01:00
def connectionLost ( self , reason ) :
2019-01-16 10:27:59 +01:00
redis_server_stream . sadd ( ' ended_session ' , self . session_uuid )
2019-01-15 14:15:37 +01:00
self . setTimeout ( None )
2019-06-03 17:29:20 +02:00
2019-04-24 10:43:47 +02:00
if not self . duplicate :
2019-06-03 17:29:20 +02:00
if self . type == 254 or self . type == 2 :
redis_server_stream . srem ( ' active_uuid_type {} : {} ' . format ( self . type , self . uuid ) , self . session_uuid )
if not redis_server_stream . exists ( ' active_uuid_type {} : {} ' . format ( self . type , self . uuid ) ) :
redis_server_stream . srem ( ' active_connection: {} ' . format ( self . type ) , self . uuid )
redis_server_stream . srem ( ' active_connection_by_uuid: {} ' . format ( self . uuid ) , self . type )
# clean extended type
current_extended_type = redis_server_stream . hget ( ' map:session-uuid_active_extended_type ' , self . session_uuid )
if current_extended_type :
redis_server_stream . hdel ( ' map:session-uuid_active_extended_type ' , self . session_uuid )
redis_server_stream . srem ( ' active_connection_extended_type: {} ' . format ( self . uuid ) , current_extended_type )
2019-04-24 10:53:36 +02:00
else :
2019-06-04 09:13:00 +02:00
if self . uuid :
redis_server_stream . srem ( ' active_connection: {} ' . format ( self . type ) , self . uuid )
redis_server_stream . srem ( ' active_connection_by_uuid: {} ' . format ( self . uuid ) , self . type )
2019-06-03 17:29:20 +02:00
2019-04-02 16:18:37 +02:00
if self . uuid :
redis_server_stream . srem ( ' map:active_connection-uuid-session_uuid: {} ' . format ( self . uuid ) , self . session_uuid )
2019-06-03 17:29:20 +02:00
if not redis_server_stream . exists ( ' active_connection_by_uuid: {} ' . format ( self . uuid ) ) :
redis_server_stream . srem ( ' active_connection ' , self . uuid )
logger . debug ( ' Connection closed: session_uuid= {} ' . format ( self . session_uuid ) )
dict_all_connection . pop ( self . session_uuid )
2019-01-11 13:54:01 +01:00
2019-01-02 17:00:43 +01:00
def unpack_header ( self , data ) :
data_header = { }
2019-01-03 15:46:42 +01:00
if len ( data ) > = header_size :
2019-01-02 17:00:43 +01:00
data_header [ ' version ' ] = struct . unpack ( ' B ' , data [ 0 : 1 ] ) [ 0 ]
data_header [ ' type ' ] = struct . unpack ( ' B ' , data [ 1 : 2 ] ) [ 0 ]
data_header [ ' uuid_header ' ] = data [ 2 : 18 ] . hex ( )
data_header [ ' timestamp ' ] = struct . unpack ( ' Q ' , data [ 18 : 26 ] ) [ 0 ]
data_header [ ' hmac_header ' ] = data [ 26 : 58 ]
data_header [ ' size ' ] = struct . unpack ( ' I ' , data [ 58 : 62 ] ) [ 0 ]
2019-09-30 17:07:25 +02:00
return data_header
2019-01-02 17:00:43 +01:00
2019-04-02 16:18:37 +02:00
def check_connection_validity ( self , data_header ) :
# blacklist ip by uuid
if redis_server_metadata . sismember ( ' blacklist_ip_by_uuid ' , data_header [ ' uuid_header ' ] ) :
redis_server_metadata . sadd ( ' blacklist_ip ' , self . ip )
self . transport . abortConnection ( )
logger . warning ( ' Blacklisted IP by UUID= {} , connection closed ' . format ( data_header [ ' uuid_header ' ] ) )
return False
# uuid blacklist
if redis_server_metadata . sismember ( ' blacklist_uuid ' , data_header [ ' uuid_header ' ] ) :
logger . warning ( ' Blacklisted UUID= {} , connection closed ' . format ( data_header [ ' uuid_header ' ] ) )
self . transport . abortConnection ( )
return False
2019-09-30 17:07:25 +02:00
# Check server mode
if not server_mode_registration ( data_header [ ' uuid_header ' ] ) :
self . transport . abortConnection ( )
return False
2019-04-02 16:18:37 +02:00
# check temp blacklist
if redis_server_stream . exists ( ' temp_blacklist_uuid: {} ' . format ( data_header [ ' uuid_header ' ] ) ) :
logger . warning ( ' Temporarily Blacklisted UUID= {} , connection closed ' . format ( data_header [ ' uuid_header ' ] ) )
redis_server_metadata . hset ( ' metadata_uuid: {} ' . format ( data_header [ ' uuid_header ' ] ) , ' Error ' , ' Error: This UUID is temporarily blacklisted ' )
self . transport . abortConnection ( )
return False
# check default size limit
if data_header [ ' size ' ] > data_default_size_limit :
self . transport . abortConnection ( )
logger . warning ( ' Incorrect header data size: the server received more data than expected by default, expected= {} , received= {} , uuid= {} , session_uuid= {} ' . format ( data_default_size_limit , data_header [ ' size ' ] , data_header [ ' uuid_header ' ] , self . session_uuid ) )
return False
# Worker: Incorrect type
if redis_server_stream . sismember ( ' Error:IncorrectType ' , self . session_uuid ) :
self . transport . abortConnection ( )
redis_server_stream . delete ( ' stream: {} : {} ' . format ( data_header [ ' type ' ] , self . session_uuid ) )
redis_server_stream . srem ( ' Error:IncorrectType ' , self . session_uuid )
logger . warning ( ' Incorrect type= {} detected by worker, uuid= {} , session_uuid= {} ' . format ( data_header [ ' type ' ] , data_header [ ' uuid_header ' ] , self . session_uuid ) )
return False
return True
2019-01-07 16:11:04 +01:00
def process_header ( self , data , ip , source_port ) :
2019-01-02 17:00:43 +01:00
if not self . buffer :
data_header = self . unpack_header ( data )
if data_header :
2019-04-02 16:18:37 +02:00
if not self . check_connection_validity ( data_header ) :
return 1
2019-09-30 17:07:25 +02:00
if is_valid_header ( data_header [ ' uuid_header ' ] , data_header [ ' type ' ] ) :
2019-01-28 11:35:28 +01:00
# auto kill connection # TODO: map type
if self . first_connection :
self . first_connection = False
2019-06-03 17:29:20 +02:00
if data_header [ ' type ' ] == 2 :
redis_server_stream . sadd ( ' active_uuid_type2: {} ' . format ( data_header [ ' uuid_header ' ] ) , self . session_uuid )
# type 254, check if previous type 2 saved
elif data_header [ ' type ' ] == 254 :
logger . warning ( ' a type 2 packet must be sent, ip= {} uuid= {} type= {} session_uuid= {} ' . format ( ip , data_header [ ' uuid_header ' ] , data_header [ ' type ' ] , self . session_uuid ) )
redis_server_metadata . hset ( ' metadata_uuid: {} ' . format ( data_header [ ' uuid_header ' ] ) , ' Error ' , ' Error: a type 2 packet must be sent, type= {} ' . format ( data_header [ ' type ' ] ) )
self . duplicate = True
self . transport . abortConnection ( )
return 1
# accept only one type/by uuid (except for type 2/254)
elif redis_server_stream . sismember ( ' active_connection: {} ' . format ( data_header [ ' type ' ] ) , ' {} ' . format ( data_header [ ' uuid_header ' ] ) ) :
2019-01-28 11:35:28 +01:00
# same IP-type for an UUID
logger . warning ( ' is using the same UUID for one type, ip= {} uuid= {} type= {} session_uuid= {} ' . format ( ip , data_header [ ' uuid_header ' ] , data_header [ ' type ' ] , self . session_uuid ) )
redis_server_metadata . hset ( ' metadata_uuid: {} ' . format ( data_header [ ' uuid_header ' ] ) , ' Error ' , ' Error: This UUID is using the same UUID for one type= {} ' . format ( data_header [ ' type ' ] ) )
2019-04-24 10:43:47 +02:00
self . duplicate = True
2019-01-28 11:35:28 +01:00
self . transport . abortConnection ( )
2019-04-02 16:18:37 +02:00
return 1
2019-06-03 17:29:20 +02:00
self . type = data_header [ ' type ' ]
self . uuid = data_header [ ' uuid_header ' ]
# worker entry point: map type:session_uuid
redis_server_stream . sadd ( ' session_uuid: {} ' . format ( data_header [ ' type ' ] ) , self . session_uuid . encode ( ) )
## save active connection ##
#active Connection
redis_server_stream . sadd ( ' active_connection: {} ' . format ( self . type ) , self . uuid )
redis_server_stream . sadd ( ' active_connection_by_uuid: {} ' . format ( self . uuid ) , self . type )
redis_server_stream . sadd ( ' active_connection ' , self . uuid )
# map session_uuid/uuid
redis_server_stream . sadd ( ' map:active_connection-uuid-session_uuid: {} ' . format ( self . uuid ) , self . session_uuid )
# map all type by uuid ## TODO: # FIXME: put me in workers ??????
redis_server_metadata . sadd ( ' all_types_by_uuid: {} ' . format ( data_header [ ' uuid_header ' ] ) , data_header [ ' type ' ] )
## ##
2019-04-02 16:18:37 +02:00
2019-04-10 13:26:09 +02:00
# check if type change
if self . data_saved :
# type change detected
if self . type != data_header [ ' type ' ] :
# Meta types
if self . type == 2 and data_header [ ' type ' ] == 254 :
self . update_stream_type = True
self . type = data_header [ ' type ' ]
2019-06-03 17:29:20 +02:00
#redis_server_stream.hdel('map-type:session_uuid-uuid:2', self.session_uuid) # # TODO: to remove / refractor
redis_server_stream . srem ( ' active_uuid_type2: {} ' . format ( self . uuid ) , self . session_uuid )
# remove type 2 connection
if not redis_server_stream . exists ( ' active_uuid_type2: {} ' . format ( self . uuid ) ) :
redis_server_stream . srem ( ' active_connection:2 ' , self . uuid )
redis_server_stream . srem ( ' active_connection_by_uuid: {} ' . format ( self . uuid ) , 2 )
## save active connection ##
#active Connection
redis_server_stream . sadd ( ' active_connection: {} ' . format ( self . type ) , self . uuid )
redis_server_stream . sadd ( ' active_connection_by_uuid: {} ' . format ( self . uuid ) , self . type )
redis_server_stream . sadd ( ' active_connection ' , self . uuid )
redis_server_stream . sadd ( ' active_uuid_type254: {} ' . format ( self . uuid ) , self . session_uuid )
# map all type by uuid ## TODO: # FIXME: put me in workers ??????
redis_server_metadata . sadd ( ' all_types_by_uuid: {} ' . format ( data_header [ ' uuid_header ' ] ) , data_header [ ' type ' ] )
## ##
#redis_server_stream.hset('map-type:session_uuid-uuid:{}'.format(data_header['type']), self.session_uuid, data_header['uuid_header'])
2019-04-10 13:26:09 +02:00
# Type Error
else :
logger . warning ( ' Unexpected type change, type= {} new type= {} , ip= {} uuid= {} session_uuid= {} ' . format ( ip , data_header [ ' uuid_header ' ] , data_header [ ' type ' ] , self . session_uuid ) )
redis_server_metadata . hset ( ' metadata_uuid: {} ' . format ( data_header [ ' uuid_header ' ] ) , ' Error ' , ' Error: Unexpected type change type= {} , new type= {} ' . format ( self . type , data_header [ ' type ' ] ) )
self . transport . abortConnection ( )
return 1
2019-04-02 16:18:37 +02:00
# check if the uuid is the same
if self . uuid != data_header [ ' uuid_header ' ] :
logger . warning ( ' The uuid change during the connection, ip= {} uuid= {} type= {} session_uuid= {} new_uuid= {} ' . format ( ip , self . uuid , data_header [ ' type ' ] , self . session_uuid , data_header [ ' uuid_header ' ] ) )
redis_server_metadata . hset ( ' metadata_uuid: {} ' . format ( data_header [ ' uuid_header ' ] ) , ' Error ' , ' Error: The uuid change, new_uuid= {} ' . format ( data_header [ ' uuid_header ' ] ) )
self . transport . abortConnection ( )
return 1
## TODO: ban ?
2019-01-28 11:35:28 +01:00
2019-01-02 17:00:43 +01:00
# check data size
2019-01-03 15:46:42 +01:00
if data_header [ ' size ' ] == ( len ( data ) - header_size ) :
2019-04-02 16:18:37 +02:00
res = self . process_d4_data ( data , data_header , ip )
# Error detected, kill connection
if res == 1 :
return 1
2019-01-02 17:00:43 +01:00
# multiple d4 headers
2019-01-03 15:46:42 +01:00
elif data_header [ ' size ' ] < ( len ( data ) - header_size ) :
next_data = data [ data_header [ ' size ' ] + header_size : ]
data = data [ : data_header [ ' size ' ] + header_size ]
2019-01-03 09:41:15 +01:00
#print('------------------------------------------------')
#print(data)
#print()
#print(next_data)
2019-04-02 16:18:37 +02:00
res = self . process_d4_data ( data , data_header , ip )
# Error detected, kill connection
if res == 1 :
return 1
2019-01-02 17:00:43 +01:00
# process next d4 header
2019-01-08 10:09:57 +01:00
self . process_header ( next_data , ip , source_port )
2019-01-03 15:46:42 +01:00
# data_header['size'] > (len(data) - header_size)
2019-01-02 17:00:43 +01:00
# buffer the data
else :
2019-01-03 09:41:15 +01:00
#print('**********************************************************')
#print(data)
#print(data_header['size'])
2019-01-03 15:46:42 +01:00
#print((len(data) - header_size))
2019-01-02 17:00:43 +01:00
self . buffer + = data
else :
2019-01-03 15:46:42 +01:00
if len ( data ) < header_size :
2019-01-02 17:00:43 +01:00
self . buffer + = data
else :
print ( ' discard data ' )
print ( data_header )
print ( data )
2019-01-15 14:15:37 +01:00
logger . warning ( ' Invalid Header, uuid= {} , session_uuid= {} ' . format ( data_header [ ' uuid_header ' ] , self . session_uuid ) )
2019-01-02 17:00:43 +01:00
else :
2019-01-03 15:46:42 +01:00
if len ( data ) < header_size :
2019-01-02 17:00:43 +01:00
self . buffer + = data
2019-01-15 14:15:37 +01:00
#logger.debug('Not enough data received, the header is incomplete, pushing data to buffer, session_uuid={}, data_received={}'.format(self.session_uuid, len(data)))
2019-01-02 17:00:43 +01:00
else :
2019-01-14 12:27:30 +01:00
2019-01-02 17:00:43 +01:00
print ( ' error discard data ' )
print ( data_header )
print ( data )
2019-01-14 12:27:30 +01:00
logger . warning ( ' Error unpacking header: incorrect format, session_uuid= {} ' . format ( self . session_uuid ) )
2019-01-02 17:00:43 +01:00
# not a header
else :
# add previous data
2019-01-03 15:46:42 +01:00
if len ( data ) < header_size :
2019-01-03 12:01:06 +01:00
self . buffer + = data
2019-01-14 11:17:18 +01:00
#print(self.buffer)
#print(len(self.buffer))
2019-01-02 17:00:43 +01:00
#todo check if valid header before adding ?
else :
data = self . buffer + data
2019-01-03 09:41:15 +01:00
#print('()()()()()()()()()')
#print(data)
#print()
self . buffer = b ' '
2019-01-08 10:09:57 +01:00
self . process_header ( data , ip , source_port )
2019-01-02 17:00:43 +01:00
2019-01-07 16:11:04 +01:00
def process_d4_data ( self , data , data_header , ip ) :
2019-01-02 17:00:43 +01:00
# empty buffer
self . buffer = b ' '
# set hmac_header to 0
data = data . replace ( data_header [ ' hmac_header ' ] , hmac_reset , 1 )
2019-01-25 17:01:43 +01:00
if self . hmac_key is None :
self . hmac_key = redis_server_metadata . hget ( ' metadata_uuid: {} ' . format ( data_header [ ' uuid_header ' ] ) , ' hmac_key ' )
if self . hmac_key is None :
self . hmac_key = redis_server_metadata . get ( ' server:hmac_default_key ' )
HMAC = hmac . new ( self . hmac_key , msg = data , digestmod = ' sha256 ' )
2019-01-02 17:00:43 +01:00
data_header [ ' hmac_header ' ] = data_header [ ' hmac_header ' ] . hex ( )
### Debug ###
2019-01-03 14:53:53 +01:00
#print('hexdigest: {}'.format( HMAC.hexdigest() ))
#print('version: {}'.format( data_header['version'] ))
#print('type: {}'.format( data_header['type'] ))
#print('uuid: {}'.format(data_header['uuid_header']))
#print('timestamp: {}'.format( data_header['timestamp'] ))
#print('hmac: {}'.format( data_header['hmac_header'] ))
#print('size: {}'.format( data_header['size'] ))
2019-01-02 17:00:43 +01:00
### ###
2019-01-14 11:17:18 +01:00
# hmac match
2019-01-02 17:00:43 +01:00
if data_header [ ' hmac_header ' ] == HMAC . hexdigest ( ) :
2019-01-16 10:27:59 +01:00
if not self . stream_max_size :
temp = redis_server_metadata . hget ( ' stream_max_size_by_uuid ' , data_header [ ' uuid_header ' ] )
if temp is not None :
self . stream_max_size = int ( temp )
else :
self . stream_max_size = default_max_entries_by_stream
2019-01-07 16:11:04 +01:00
date = datetime . datetime . now ( ) . strftime ( " % Y % m %d " )
2019-01-16 10:27:59 +01:00
if redis_server_stream . xlen ( ' stream: {} : {} ' . format ( data_header [ ' type ' ] , self . session_uuid ) ) < self . stream_max_size :
2019-04-02 16:18:37 +02:00
# Clean Error Message
redis_server_metadata . hdel ( ' metadata_uuid: {} ' . format ( data_header [ ' uuid_header ' ] ) , ' Error ' )
2019-01-16 10:27:59 +01:00
redis_server_stream . xadd ( ' stream: {} : {} ' . format ( data_header [ ' type ' ] , self . session_uuid ) , { ' message ' : data [ header_size : ] , ' uuid ' : data_header [ ' uuid_header ' ] , ' timestamp ' : data_header [ ' timestamp ' ] , ' version ' : data_header [ ' version ' ] } )
2019-01-17 15:33:44 +01:00
# daily stats
2019-01-16 10:27:59 +01:00
redis_server_metadata . zincrby ( ' stat_uuid_ip: {} : {} ' . format ( date , data_header [ ' uuid_header ' ] ) , 1 , ip )
redis_server_metadata . zincrby ( ' stat_ip_uuid: {} : {} ' . format ( date , ip ) , 1 , data_header [ ' uuid_header ' ] )
redis_server_metadata . zincrby ( ' daily_uuid: {} ' . format ( date ) , 1 , data_header [ ' uuid_header ' ] )
redis_server_metadata . zincrby ( ' daily_ip: {} ' . format ( date ) , 1 , ip )
2019-01-17 15:33:44 +01:00
redis_server_metadata . zincrby ( ' daily_type: {} ' . format ( date ) , 1 , data_header [ ' type ' ] )
redis_server_metadata . zincrby ( ' stat_type_uuid: {} : {} ' . format ( date , data_header [ ' type ' ] ) , 1 , data_header [ ' uuid_header ' ] )
2019-04-03 09:55:17 +02:00
redis_server_metadata . zincrby ( ' stat_uuid_type: {} : {} ' . format ( date , data_header [ ' uuid_header ' ] ) , 1 , data_header [ ' type ' ] )
2019-01-16 10:27:59 +01:00
#
if not redis_server_metadata . hexists ( ' metadata_uuid: {} ' . format ( data_header [ ' uuid_header ' ] ) , ' first_seen ' ) :
redis_server_metadata . hset ( ' metadata_uuid: {} ' . format ( data_header [ ' uuid_header ' ] ) , ' first_seen ' , data_header [ ' timestamp ' ] )
redis_server_metadata . hset ( ' metadata_uuid: {} ' . format ( data_header [ ' uuid_header ' ] ) , ' last_seen ' , data_header [ ' timestamp ' ] )
2019-04-03 09:55:17 +02:00
redis_server_metadata . hset ( ' metadata_type_by_uuid: {} : {} ' . format ( data_header [ ' uuid_header ' ] , data_header [ ' type ' ] ) , ' last_seen ' , data_header [ ' timestamp ' ] )
2019-01-16 10:27:59 +01:00
if not self . data_saved :
2019-01-25 16:17:46 +01:00
#UUID IP: ## TODO: use d4 timestamp ?
redis_server_metadata . lpush ( ' list_uuid_ip: {} ' . format ( data_header [ ' uuid_header ' ] ) , ' {} - {} ' . format ( ip , datetime . datetime . now ( ) . strftime ( " % Y % m %d % H % M % S " ) ) )
redis_server_metadata . ltrim ( ' list_uuid_ip: {} ' . format ( data_header [ ' uuid_header ' ] ) , 0 , 15 )
2019-01-16 10:27:59 +01:00
self . data_saved = True
2019-02-27 15:46:34 +01:00
if self . update_stream_type :
2019-04-03 09:55:17 +02:00
if not redis_server_metadata . hexists ( ' metadata_type_by_uuid: {} : {} ' . format ( data_header [ ' uuid_header ' ] , data_header [ ' type ' ] ) , ' first_seen ' ) :
redis_server_metadata . hset ( ' metadata_type_by_uuid: {} : {} ' . format ( data_header [ ' uuid_header ' ] , data_header [ ' type ' ] ) , ' first_seen ' , data_header [ ' timestamp ' ] )
2019-02-27 15:46:34 +01:00
self . update_stream_type = False
2019-04-02 16:18:37 +02:00
return 0
2019-01-16 10:27:59 +01:00
else :
logger . warning ( " stream exceed max entries limit, uuid= {} , session_uuid= {} , type= {} " . format ( data_header [ ' uuid_header ' ] , self . session_uuid , data_header [ ' type ' ] ) )
2019-01-21 16:53:40 +01:00
## TODO: FIXME
redis_server_metadata . hset ( ' metadata_uuid: {} ' . format ( data_header [ ' uuid_header ' ] ) , ' Error ' , ' Error: stream exceed max entries limit ' )
2019-01-16 10:27:59 +01:00
self . transport . abortConnection ( )
2019-04-02 16:18:37 +02:00
return 1
2019-01-02 17:00:43 +01:00
else :
print ( ' hmac do not match ' )
2019-01-03 12:01:06 +01:00
print ( data )
2019-01-15 14:15:37 +01:00
logger . debug ( " HMAC don ' t match, uuid= {} , session_uuid= {} " . format ( data_header [ ' uuid_header ' ] , self . session_uuid ) )
2019-01-21 16:53:40 +01:00
## TODO: FIXME
redis_server_metadata . hset ( ' metadata_uuid: {} ' . format ( data_header [ ' uuid_header ' ] ) , ' Error ' , ' Error: HMAC don \' t match ' )
2019-04-02 16:18:37 +02:00
self . transport . abortConnection ( )
return 1
2018-12-05 16:24:10 +01:00
2018-12-12 15:27:00 +01:00
2018-12-05 16:24:10 +01:00
def main ( reactor ) :
log . startLogging ( sys . stdout )
2019-01-03 16:31:54 +01:00
try :
certData = getModule ( __name__ ) . filePath . sibling ( ' server.pem ' ) . getContent ( )
except FileNotFoundError as e :
print ( ' Error, pem file not found ' )
print ( e )
sys . exit ( 1 )
2018-12-05 16:24:10 +01:00
certificate = ssl . PrivateCertificate . loadPEM ( certData )
2019-02-14 11:17:50 +01:00
factory = protocol . Factory . forProtocol ( D4_Server )
# use interface to support both IPv4 and IPv6
reactor . listenSSL ( 4443 , factory , certificate . options ( ) , interface = ' :: ' )
2018-12-05 16:24:10 +01:00
return defer . Deferred ( )
if __name__ == " __main__ " :
2019-01-14 11:17:18 +01:00
parser = argparse . ArgumentParser ( )
parser . add_argument ( ' -v ' , ' --verbose ' , help = ' dddd ' , type = int , default = 30 )
args = parser . parse_args ( )
2019-04-03 09:55:17 +02:00
if not redis_server_metadata . exists ( ' first_date ' ) :
redis_server_metadata . set ( ' first_date ' , datetime . datetime . now ( ) . strftime ( " % Y % m %d " ) )
2019-01-14 12:27:30 +01:00
logs_dir = ' logs '
if not os . path . isdir ( logs_dir ) :
os . makedirs ( logs_dir )
2019-01-16 10:27:59 +01:00
log_filename = ' logs/d4-server.log '
2019-01-14 11:17:18 +01:00
logger = logging . getLogger ( )
#formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
formatter = logging . Formatter ( ' %(asctime)s - %(levelname)s - %(message)s ' )
handler_log = logging . handlers . TimedRotatingFileHandler ( log_filename , when = " midnight " , interval = 1 )
2019-01-16 10:27:59 +01:00
handler_log . suffix = ' % Y- % m- %d .log '
2019-01-14 11:17:18 +01:00
handler_log . setFormatter ( formatter )
logger . addHandler ( handler_log )
logger . setLevel ( args . verbose )
2019-10-01 11:26:14 +02:00
# get file config
config_file_server = os . path . join ( os . environ [ ' D4_HOME ' ] , ' configs/server.conf ' )
config_server = configparser . ConfigParser ( )
config_server . read ( config_file_server )
# get server_mode
server_mode = config_server [ ' D4_Server ' ] . get ( ' server_mode ' )
if server_mode not in all_server_modes :
print ( ' Error: incorrect server_mode ' )
logger . critical ( ' Error: incorrect server_mode ' )
sys . exit ( 1 )
logger . info ( ' Server mode: {} ' . format ( server_mode ) )
2019-01-14 12:27:30 +01:00
logger . info ( ' Launching Server ... ' )
2019-04-03 09:55:17 +02:00
2018-12-05 16:24:10 +01:00
task . react ( main )