mirror of https://github.com/D4-project/d4-core
chg: [server, workers] add new redis metadata + fix log rotation + add control on stream size + add control on accepted types
parent
b6c48d95e3
commit
e24c5e4149
|
@ -23,25 +23,48 @@ from twisted.protocols.policies import TimeoutMixin
|
||||||
hmac_reset = bytearray(32)
|
hmac_reset = bytearray(32)
|
||||||
hmac_key = b'private key to change'
|
hmac_key = b'private key to change'
|
||||||
|
|
||||||
|
accepted_type = [1, 4]
|
||||||
|
|
||||||
timeout_time = 30
|
timeout_time = 30
|
||||||
|
|
||||||
header_size = 62
|
header_size = 62
|
||||||
|
|
||||||
data_default_size_limit = 100000
|
data_default_size_limit = 100000
|
||||||
|
default_max_entries_by_stream = 10000
|
||||||
|
|
||||||
host_redis="localhost"
|
host_redis_stream = "localhost"
|
||||||
port_redis=6379
|
port_redis_stream = 6379
|
||||||
redis_server = redis.StrictRedis(
|
|
||||||
host=host_redis,
|
host_redis_metadata = "localhost"
|
||||||
port=port_redis,
|
port_redis_metadata= 6380
|
||||||
|
|
||||||
|
redis_server_stream = redis.StrictRedis(
|
||||||
|
host=host_redis_stream,
|
||||||
|
port=port_redis_stream,
|
||||||
|
db=0)
|
||||||
|
|
||||||
|
redis_server_metadata = redis.StrictRedis(
|
||||||
|
host=host_redis_metadata,
|
||||||
|
port=port_redis_metadata,
|
||||||
db=0)
|
db=0)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
redis_server.ping()
|
redis_server_stream.ping()
|
||||||
except redis.exceptions.ConnectionError:
|
except redis.exceptions.ConnectionError:
|
||||||
print('Error: Redis server {}:{}, ConnectionError'.format(host_redis, port_redis))
|
print('Error: Redis server {}:{}, ConnectionError'.format(host_redis_stream, port_redis_stream))
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
|
try:
|
||||||
|
redis_server_metadata.ping()
|
||||||
|
except redis.exceptions.ConnectionError:
|
||||||
|
print('Error: Redis server {}:{}, ConnectionError'.format(host_redis_metadata, port_redis_metadata))
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
# init redis_server_metadata
|
||||||
|
redis_server_metadata.delete('server:accepted_type')
|
||||||
|
for type in accepted_type:
|
||||||
|
redis_server_metadata.sadd('server:accepted_type', type)
|
||||||
|
|
||||||
class Echo(Protocol, TimeoutMixin):
|
class Echo(Protocol, TimeoutMixin):
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
|
@ -49,13 +72,14 @@ class Echo(Protocol, TimeoutMixin):
|
||||||
self.setTimeout(timeout_time)
|
self.setTimeout(timeout_time)
|
||||||
self.session_uuid = str(uuid.uuid4())
|
self.session_uuid = str(uuid.uuid4())
|
||||||
self.data_saved = False
|
self.data_saved = False
|
||||||
|
self.stream_max_size = None
|
||||||
logger.debug('New session: session_uuid={}'.format(self.session_uuid))
|
logger.debug('New session: session_uuid={}'.format(self.session_uuid))
|
||||||
|
|
||||||
def dataReceived(self, data):
|
def dataReceived(self, data):
|
||||||
self.resetTimeout()
|
self.resetTimeout()
|
||||||
ip, source_port = self.transport.client
|
ip, source_port = self.transport.client
|
||||||
# check blacklisted_ip
|
# check blacklisted_ip
|
||||||
if redis_server.sismember('blacklist_ip', ip):
|
if redis_server_metadata.sismember('blacklist_ip', ip):
|
||||||
self.transport.abortConnection()
|
self.transport.abortConnection()
|
||||||
logger.warning('Blacklisted IP={}, connection closed'.format(ip))
|
logger.warning('Blacklisted IP={}, connection closed'.format(ip))
|
||||||
|
|
||||||
|
@ -67,7 +91,7 @@ class Echo(Protocol, TimeoutMixin):
|
||||||
logger.debug('buffer timeout, session_uuid={}'.format(self.session_uuid))
|
logger.debug('buffer timeout, session_uuid={}'.format(self.session_uuid))
|
||||||
|
|
||||||
def connectionLost(self, reason):
|
def connectionLost(self, reason):
|
||||||
redis_server.sadd('ended_session', self.session_uuid)
|
redis_server_stream.sadd('ended_session', self.session_uuid)
|
||||||
self.setTimeout(None)
|
self.setTimeout(None)
|
||||||
logger.debug('Connection closed: session_uuid={}'.format(self.session_uuid))
|
logger.debug('Connection closed: session_uuid={}'.format(self.session_uuid))
|
||||||
|
|
||||||
|
@ -82,7 +106,7 @@ class Echo(Protocol, TimeoutMixin):
|
||||||
data_header['size'] = struct.unpack('I', data[58:62])[0]
|
data_header['size'] = struct.unpack('I', data[58:62])[0]
|
||||||
|
|
||||||
# uuid blacklist
|
# uuid blacklist
|
||||||
if redis_server.sismember('blacklist_uuid', data_header['uuid_header']):
|
if redis_server_metadata.sismember('blacklist_uuid', data_header['uuid_header']):
|
||||||
self.transport.abortConnection()
|
self.transport.abortConnection()
|
||||||
logger.warning('Blacklisted UUID={}, connection closed'.format(data_header['uuid_header']))
|
logger.warning('Blacklisted UUID={}, connection closed'.format(data_header['uuid_header']))
|
||||||
|
|
||||||
|
@ -102,9 +126,12 @@ class Echo(Protocol, TimeoutMixin):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# # TODO: check timestamp
|
# # TODO: check timestamp
|
||||||
def is_valid_header(self, uuid_to_check):
|
def is_valid_header(self, uuid_to_check, type):
|
||||||
if self.is_valid_uuid_v4(uuid_to_check):
|
if self.is_valid_uuid_v4(uuid_to_check):
|
||||||
return True
|
if redis_server_metadata.sismember('server:accepted_type', type):
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
logger.warning('Invalid type, the server don\'t accept this type: {}, uuid={}, session_uuid={}'.format(type, uuid_to_check, self.session_uuid))
|
||||||
else:
|
else:
|
||||||
logger.info('Invalid Header, uuid={}, session_uuid={}'.format(uuid_to_check, self.session_uuid))
|
logger.info('Invalid Header, uuid={}, session_uuid={}'.format(uuid_to_check, self.session_uuid))
|
||||||
return False
|
return False
|
||||||
|
@ -113,7 +140,7 @@ class Echo(Protocol, TimeoutMixin):
|
||||||
if not self.buffer:
|
if not self.buffer:
|
||||||
data_header = self.unpack_header(data)
|
data_header = self.unpack_header(data)
|
||||||
if data_header:
|
if data_header:
|
||||||
if self.is_valid_header(data_header['uuid_header']):
|
if self.is_valid_header(data_header['uuid_header'], data_header['type']):
|
||||||
# check data size
|
# check data size
|
||||||
if data_header['size'] == (len(data) - header_size):
|
if data_header['size'] == (len(data) - header_size):
|
||||||
self.process_d4_data(data, data_header, ip)
|
self.process_d4_data(data, data_header, ip)
|
||||||
|
@ -191,23 +218,35 @@ class Echo(Protocol, TimeoutMixin):
|
||||||
|
|
||||||
# hmac match
|
# hmac match
|
||||||
if data_header['hmac_header'] == HMAC.hexdigest():
|
if data_header['hmac_header'] == HMAC.hexdigest():
|
||||||
|
if not self.stream_max_size:
|
||||||
|
temp = redis_server_metadata.hget('stream_max_size_by_uuid', data_header['uuid_header'])
|
||||||
|
if temp is not None:
|
||||||
|
self.stream_max_size = int(temp)
|
||||||
|
else:
|
||||||
|
self.stream_max_size = default_max_entries_by_stream
|
||||||
|
|
||||||
date = datetime.datetime.now().strftime("%Y%m%d")
|
date = datetime.datetime.now().strftime("%Y%m%d")
|
||||||
redis_server.xadd('stream:{}:{}'.format(data_header['type'], self.session_uuid), {'message': data[header_size:], 'uuid': data_header['uuid_header'], 'timestamp': data_header['timestamp'], 'version': data_header['version']})
|
if redis_server_stream.xlen('stream:{}:{}'.format(data_header['type'], self.session_uuid)) < self.stream_max_size:
|
||||||
redis_server.zincrby('stat_uuid_ip:{}:{}'.format(date, data_header['uuid_header']), 1, ip)
|
|
||||||
redis_server.zincrby('stat_ip_uuid:{}:{}'.format(date, ip), 1, data_header['uuid_header'])
|
|
||||||
|
|
||||||
redis_server.sadd('daily_uuid:{}'.format(date), data_header['uuid_header'])
|
redis_server_stream.xadd('stream:{}:{}'.format(data_header['type'], self.session_uuid), {'message': data[header_size:], 'uuid': data_header['uuid_header'], 'timestamp': data_header['timestamp'], 'version': data_header['version']})
|
||||||
redis_server.sadd('daily_ip:{}'.format(date), ip)
|
redis_server_metadata.zincrby('stat_uuid_ip:{}:{}'.format(date, data_header['uuid_header']), 1, ip)
|
||||||
|
redis_server_metadata.zincrby('stat_ip_uuid:{}:{}'.format(date, ip), 1, data_header['uuid_header'])
|
||||||
|
|
||||||
#
|
redis_server_metadata.zincrby('daily_uuid:{}'.format(date), 1, data_header['uuid_header'])
|
||||||
if not redis_server.hexists('metadata_uuid:{}'.format(data_header['uuid_header']), 'first_seen'):
|
redis_server_metadata.zincrby('daily_ip:{}'.format(date), 1, ip)
|
||||||
redis_server.hset('metadata_uuid:{}'.format(data_header['uuid_header']), 'first_seen', data_header['timestamp'])
|
|
||||||
redis_server.hset('metadata_uuid:{}'.format(data_header['uuid_header']), 'last_seen', data_header['timestamp'])
|
|
||||||
|
|
||||||
if not self.data_saved:
|
#
|
||||||
redis_server.sadd('session_uuid:{}'.format(data_header['type']), self.session_uuid.encode())
|
if not redis_server_metadata.hexists('metadata_uuid:{}'.format(data_header['uuid_header']), 'first_seen'):
|
||||||
redis_server.hset('map-type:session_uuid-uuid:{}'.format(data_header['type']), self.session_uuid, data_header['uuid_header'])
|
redis_server_metadata.hset('metadata_uuid:{}'.format(data_header['uuid_header']), 'first_seen', data_header['timestamp'])
|
||||||
self.data_saved = True
|
redis_server_metadata.hset('metadata_uuid:{}'.format(data_header['uuid_header']), 'last_seen', data_header['timestamp'])
|
||||||
|
|
||||||
|
if not self.data_saved:
|
||||||
|
redis_server_stream.sadd('session_uuid:{}'.format(data_header['type']), self.session_uuid.encode())
|
||||||
|
redis_server_stream.hset('map-type:session_uuid-uuid:{}'.format(data_header['type']), self.session_uuid, data_header['uuid_header'])
|
||||||
|
self.data_saved = True
|
||||||
|
else:
|
||||||
|
logger.warning("stream exceed max entries limit, uuid={}, session_uuid={}, type={}".format(data_header['uuid_header'], self.session_uuid, data_header['type']))
|
||||||
|
self.transport.abortConnection()
|
||||||
else:
|
else:
|
||||||
print('hmac do not match')
|
print('hmac do not match')
|
||||||
print(data)
|
print(data)
|
||||||
|
@ -238,12 +277,12 @@ if __name__ == "__main__":
|
||||||
if not os.path.isdir(logs_dir):
|
if not os.path.isdir(logs_dir):
|
||||||
os.makedirs(logs_dir)
|
os.makedirs(logs_dir)
|
||||||
|
|
||||||
log_filename = 'logs/d4-server-logs.log'
|
log_filename = 'logs/d4-server.log'
|
||||||
logger = logging.getLogger()
|
logger = logging.getLogger()
|
||||||
#formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
#formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||||||
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
|
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
|
||||||
handler_log = logging.handlers.TimedRotatingFileHandler(log_filename, when="midnight", interval=1)
|
handler_log = logging.handlers.TimedRotatingFileHandler(log_filename, when="midnight", interval=1)
|
||||||
handler_log.suffix = '%Y-%m-%d-{}'.format(log_filename)
|
handler_log.suffix = '%Y-%m-%d.log'
|
||||||
handler_log.setFormatter(formatter)
|
handler_log.setFormatter(formatter)
|
||||||
logger.addHandler(handler_log)
|
logger.addHandler(handler_log)
|
||||||
logger.setLevel(args.verbose)
|
logger.setLevel(args.verbose)
|
||||||
|
|
|
@ -12,9 +12,12 @@ def data_incorrect_format(session_uuid):
|
||||||
print('Incorrect format')
|
print('Incorrect format')
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
redis_server = redis.StrictRedis(
|
host_redis_stream = "localhost"
|
||||||
host="localhost",
|
port_redis_stream = 6379
|
||||||
port=6379,
|
|
||||||
|
redis_server_stream = redis.StrictRedis(
|
||||||
|
host=host_redis_stream,
|
||||||
|
port=port_redis_stream,
|
||||||
db=0)
|
db=0)
|
||||||
|
|
||||||
type = 1
|
type = 1
|
||||||
|
@ -31,11 +34,9 @@ if __name__ == "__main__":
|
||||||
|
|
||||||
session_uuid = sys.argv[1]
|
session_uuid = sys.argv[1]
|
||||||
stream_name = 'stream:{}:{}'.format(type, session_uuid)
|
stream_name = 'stream:{}:{}'.format(type, session_uuid)
|
||||||
consumer_name = 'consumer:{}:{}'.format(type, session_uuid)
|
|
||||||
group_name = 'workers:{}:{}'.format(type, session_uuid)
|
|
||||||
id = '0'
|
id = '0'
|
||||||
|
|
||||||
res = redis_server.xread({stream_name: id}, count=1)
|
res = redis_server_stream.xread({stream_name: id}, count=1)
|
||||||
if res:
|
if res:
|
||||||
uuid = res[0][1][0][1][b'uuid'].decode()
|
uuid = res[0][1][0][1][b'uuid'].decode()
|
||||||
date = datetime.datetime.now().strftime("%Y%m%d")
|
date = datetime.datetime.now().strftime("%Y%m%d")
|
||||||
|
@ -43,19 +44,19 @@ if __name__ == "__main__":
|
||||||
rel_path = os.path.join(tcpdump_path, date[0:4], date[4:6], date[6:8])
|
rel_path = os.path.join(tcpdump_path, date[0:4], date[4:6], date[6:8])
|
||||||
if not os.path.isdir(rel_path):
|
if not os.path.isdir(rel_path):
|
||||||
os.makedirs(rel_path)
|
os.makedirs(rel_path)
|
||||||
|
print('---- worker launched, uuid={} session_uuid={}'.format(uuid, session_uuid))
|
||||||
else:
|
else:
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
print('Incorrect message')
|
print('Incorrect message')
|
||||||
redis_server.sadd('working_session_uuid:{}'.format(type), session_uuid)
|
redis_server_stream.sadd('working_session_uuid:{}'.format(type), session_uuid)
|
||||||
|
|
||||||
#LAUNCH a tcpdump
|
#LAUNCH a tcpdump
|
||||||
process = subprocess.Popen(["tcpdump", '-n', '-r', '-', '-G', tcp_dump_cycle, '-w', '{}/%Y/%m/%d/{}-%Y-%m-%d-%H%M%S.cap'.format(tcpdump_path, uuid)], stdin=subprocess.PIPE)
|
process = subprocess.Popen(["tcpdump", '-n', '-r', '-', '-G', tcp_dump_cycle, '-w', '{}/%Y/%m/%d/{}-%Y-%m-%d-%H%M%S.cap'.format(tcpdump_path, uuid)], stdin=subprocess.PIPE)
|
||||||
nb_save = 0
|
nb_save = 0
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
#redis_server.sadd('working_session_uuid:{}'.format(type), session_uuid)
|
|
||||||
|
|
||||||
res = redis_server.xread({stream_name: id}, count=1)
|
res = redis_server_stream.xread({stream_name: id}, count=1)
|
||||||
if res:
|
if res:
|
||||||
new_id = res[0][1][0][0].decode()
|
new_id = res[0][1][0][0].decode()
|
||||||
if id != new_id:
|
if id != new_id:
|
||||||
|
@ -83,14 +84,14 @@ if __name__ == "__main__":
|
||||||
nb_save += 1
|
nb_save += 1
|
||||||
|
|
||||||
if nb_save > stream_buffer:
|
if nb_save > stream_buffer:
|
||||||
for id in id_to_delete:
|
for id_saved in id_to_delete:
|
||||||
redis_server.xdel(stream_name, id)
|
redis_server_stream.xdel(stream_name, id_saved)
|
||||||
id_to_delete = []
|
id_to_delete = []
|
||||||
nb_save = 0
|
nb_save = 0
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# sucess, all data are saved
|
# sucess, all data are saved
|
||||||
if redis_server.sismember('ended_session', session_uuid):
|
if redis_server_stream.sismember('ended_session', session_uuid):
|
||||||
out, err = process.communicate(timeout= 0.5)
|
out, err = process.communicate(timeout= 0.5)
|
||||||
#print(out)
|
#print(out)
|
||||||
if err == b'tcpdump: unknown file format\n':
|
if err == b'tcpdump: unknown file format\n':
|
||||||
|
@ -99,14 +100,14 @@ if __name__ == "__main__":
|
||||||
print(err)
|
print(err)
|
||||||
|
|
||||||
#print(process.stderr.read())
|
#print(process.stderr.read())
|
||||||
redis_server.srem('ended_session', session_uuid)
|
redis_server_stream.srem('ended_session', session_uuid)
|
||||||
redis_server.srem('session_uuid:{}'.format(type), session_uuid)
|
redis_server_stream.srem('session_uuid:{}'.format(type), session_uuid)
|
||||||
redis_server.srem('working_session_uuid:{}'.format(type), session_uuid)
|
redis_server_stream.srem('working_session_uuid:{}'.format(type), session_uuid)
|
||||||
redis_server.hdel('map-type:session_uuid-uuid:{}'.format(type), session_uuid)
|
redis_server_stream.hdel('map-type:session_uuid-uuid:{}'.format(type), session_uuid)
|
||||||
redis_server.delete(stream_name)
|
redis_server_stream.delete(stream_name)
|
||||||
# make sure that tcpdump can save all datas
|
# make sure that tcpdump can save all datas
|
||||||
time.sleep(int(tcp_dump_cycle) + 1)
|
time.sleep(10)
|
||||||
print('tcpdump: {} Done'.format(session_uuid))
|
print('---- tcpdump DONE, uuid={} session_uuid={}'.format(uuid, session_uuid))
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
else:
|
else:
|
||||||
time.sleep(10)
|
time.sleep(10)
|
||||||
|
|
|
@ -6,29 +6,32 @@ import time
|
||||||
import redis
|
import redis
|
||||||
import subprocess
|
import subprocess
|
||||||
|
|
||||||
redis_server = redis.StrictRedis(
|
host_redis_stream = "localhost"
|
||||||
host="localhost",
|
port_redis_stream = 6379
|
||||||
port=6379,
|
|
||||||
|
redis_server_stream = redis.StrictRedis(
|
||||||
|
host=host_redis_stream,
|
||||||
|
port=port_redis_stream,
|
||||||
db=0)
|
db=0)
|
||||||
type = 1
|
type = 1
|
||||||
|
|
||||||
try:
|
try:
|
||||||
redis_server.ping()
|
redis_server_stream.ping()
|
||||||
except redis.exceptions.ConnectionError:
|
except redis.exceptions.ConnectionError:
|
||||||
print('Error: Redis server {}:{}, ConnectionError'.format(host_redis, port_redis))
|
print('Error: Redis server {}:{}, ConnectionError'.format(host_redis, port_redis))
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
stream_name = 'stream:{}'.format(type)
|
stream_name = 'stream:{}'.format(type)
|
||||||
redis_server.delete('working_session_uuid:{}'.format(type))
|
redis_server_stream.delete('working_session_uuid:{}'.format(type))
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
for session_uuid in redis_server.smembers('session_uuid:{}'.format(type)):
|
for session_uuid in redis_server_stream.smembers('session_uuid:{}'.format(type)):
|
||||||
session_uuid = session_uuid.decode()
|
session_uuid = session_uuid.decode()
|
||||||
if not redis_server.sismember('working_session_uuid:{}'.format(type), session_uuid):
|
if not redis_server_stream.sismember('working_session_uuid:{}'.format(type), session_uuid):
|
||||||
|
|
||||||
process = subprocess.Popen(['./worker.py', session_uuid])
|
process = subprocess.Popen(['./worker.py', session_uuid])
|
||||||
print('New worker launched: {}'.format(session_uuid))
|
print('Launching new worker{} ... session_uuid={}'.format(type, session_uuid))
|
||||||
|
|
||||||
|
|
||||||
#print('.')
|
#print('.')
|
||||||
|
|
|
@ -11,9 +11,12 @@ def data_incorrect_format(session_uuid):
|
||||||
print('Incorrect format')
|
print('Incorrect format')
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
redis_server = redis.StrictRedis(
|
host_redis_stream = "localhost"
|
||||||
host="localhost",
|
port_redis_stream = 6379
|
||||||
port=6379,
|
|
||||||
|
redis_server_stream = redis.StrictRedis(
|
||||||
|
host=host_redis_stream,
|
||||||
|
port=port_redis_stream,
|
||||||
db=0)
|
db=0)
|
||||||
|
|
||||||
type = 4
|
type = 4
|
||||||
|
@ -29,7 +32,9 @@ if __name__ == "__main__":
|
||||||
stream_name = 'stream:{}:{}'.format(type, session_uuid)
|
stream_name = 'stream:{}:{}'.format(type, session_uuid)
|
||||||
id = '0'
|
id = '0'
|
||||||
|
|
||||||
res = redis_server.xread({stream_name: id}, count=1)
|
redis_server_stream.sadd('working_session_uuid:{}'.format(type), session_uuid)
|
||||||
|
|
||||||
|
res = redis_server_stream.xread({stream_name: id}, count=1)
|
||||||
if res:
|
if res:
|
||||||
date = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
|
date = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
|
||||||
uuid = res[0][1][0][1][b'uuid'].decode()
|
uuid = res[0][1][0][1][b'uuid'].decode()
|
||||||
|
@ -39,17 +44,17 @@ if __name__ == "__main__":
|
||||||
os.makedirs(dir_path)
|
os.makedirs(dir_path)
|
||||||
filename = '{}-{}-{}-{}-{}.dnscap.txt'.format(uuid, date[0:4], date[4:6], date[6:8], date[8:14])
|
filename = '{}-{}-{}-{}-{}.dnscap.txt'.format(uuid, date[0:4], date[4:6], date[6:8], date[8:14])
|
||||||
rel_path = os.path.join(dir_path, filename)
|
rel_path = os.path.join(dir_path, filename)
|
||||||
|
print('---- worker launched, uuid={} session_uuid={}'.format(uuid, session_uuid))
|
||||||
else:
|
else:
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
print('Incorrect message')
|
print('Incorrect message')
|
||||||
redis_server.sadd('working_session_uuid:{}'.format(type), session_uuid)
|
|
||||||
|
|
||||||
time_file = time.time()
|
time_file = time.time()
|
||||||
rotate_file = False
|
rotate_file = False
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
|
|
||||||
res = redis_server.xread({stream_name: id}, count=1)
|
res = redis_server_stream.xread({stream_name: id}, count=1)
|
||||||
if res:
|
if res:
|
||||||
new_id = res[0][1][0][0].decode()
|
new_id = res[0][1][0][0].decode()
|
||||||
if id != new_id:
|
if id != new_id:
|
||||||
|
@ -83,17 +88,17 @@ if __name__ == "__main__":
|
||||||
with open(rel_path, 'ab') as f:
|
with open(rel_path, 'ab') as f:
|
||||||
f.write(data[b'message'])
|
f.write(data[b'message'])
|
||||||
|
|
||||||
redis_server.xdel(stream_name, id)
|
redis_server_stream.xdel(stream_name, id)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# sucess, all data are saved
|
# sucess, all data are saved
|
||||||
if redis_server.sismember('ended_session', session_uuid):
|
if redis_server_stream.sismember('ended_session', session_uuid):
|
||||||
redis_server.srem('ended_session', session_uuid)
|
redis_server_stream.srem('ended_session', session_uuid)
|
||||||
redis_server.srem('session_uuid:{}'.format(type), session_uuid)
|
redis_server_stream.srem('session_uuid:{}'.format(type), session_uuid)
|
||||||
redis_server.srem('working_session_uuid:{}'.format(type), session_uuid)
|
redis_server_stream.srem('working_session_uuid:{}'.format(type), session_uuid)
|
||||||
redis_server.hdel('map-type:session_uuid-uuid:{}'.format(type), session_uuid)
|
redis_server_stream.hdel('map-type:session_uuid-uuid:{}'.format(type), session_uuid)
|
||||||
redis_server.delete(stream_name)
|
redis_server_stream.delete(stream_name)
|
||||||
print('dnscap: {} Done'.format(session_uuid))
|
print('---- dnscap DONE, uuid={} session_uuid={}'.format(uuid, session_uuid))
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
else:
|
else:
|
||||||
time.sleep(10)
|
time.sleep(10)
|
||||||
|
|
|
@ -6,30 +6,32 @@ import time
|
||||||
import redis
|
import redis
|
||||||
import subprocess
|
import subprocess
|
||||||
|
|
||||||
redis_server = redis.StrictRedis(
|
host_redis_stream = "localhost"
|
||||||
host="localhost",
|
port_redis_stream = 6379
|
||||||
port=6379,
|
|
||||||
|
redis_server_stream = redis.StrictRedis(
|
||||||
|
host=host_redis_stream,
|
||||||
|
port=port_redis_stream,
|
||||||
db=0)
|
db=0)
|
||||||
type = 4
|
type = 4
|
||||||
|
|
||||||
try:
|
try:
|
||||||
redis_server.ping()
|
redis_server_stream.ping()
|
||||||
except redis.exceptions.ConnectionError:
|
except redis.exceptions.ConnectionError:
|
||||||
print('Error: Redis server {}:{}, ConnectionError'.format(host_redis, port_redis))
|
print('Error: Redis server {}:{}, ConnectionError'.format(host_redis, port_redis))
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
stream_name = 'stream:{}'.format(type)
|
stream_name = 'stream:{}'.format(type)
|
||||||
redis_server.delete('working_session_uuid:{}'.format(type))
|
redis_server_stream.delete('working_session_uuid:{}'.format(type))
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
for session_uuid in redis_server.smembers('session_uuid:{}'.format(type)):
|
for session_uuid in redis_server_stream.smembers('session_uuid:{}'.format(type)):
|
||||||
session_uuid = session_uuid.decode()
|
session_uuid = session_uuid.decode()
|
||||||
if not redis_server.sismember('working_session_uuid:{}'.format(type), session_uuid):
|
if not redis_server_stream.sismember('working_session_uuid:{}'.format(type), session_uuid):
|
||||||
|
|
||||||
process = subprocess.Popen(['./worker.py', session_uuid])
|
process = subprocess.Popen(['./worker.py', session_uuid])
|
||||||
print('New worker launched: {}'.format(session_uuid))
|
print('Launching new worker{} ... session_uuid={}'.format(type, session_uuid))
|
||||||
|
|
||||||
|
|
||||||
#print('.')
|
#print('.')
|
||||||
time.sleep(10)
|
time.sleep(10)
|
||||||
|
|
Loading…
Reference in New Issue