diff --git a/server/server.py b/server/server.py index d7ce5b2..979986a 100755 --- a/server/server.py +++ b/server/server.py @@ -2,6 +2,7 @@ import os import sys +import uuid import hmac import stat import redis @@ -16,33 +17,40 @@ from twisted.python.modules import getModule from twisted.internet.protocol import Protocol from twisted.protocols.policies import TimeoutMixin - -from ctypes import * -from uuid import UUID - hmac_reset = bytearray(32) -hmac_key = b'private key to change\n' +hmac_key = b'private key to change' timeout_time = 30 header_size = 62 +data_default_size_limit = 100000 + +host_redis="localhost" +port_redis=6379 redis_server = redis.StrictRedis( - host="localhost", - port=6379, + host=host_redis, + port=port_redis, db=0) +try: + redis_server.ping() +except redis.exceptions.ConnectionError: + print('Error: Redis server {}:{}, ConnectionError'.format(host_redis, port_redis)) + sys.exit(1) + class Echo(Protocol, TimeoutMixin): def __init__(self): self.buffer = b'' self.setTimeout(timeout_time) + self.session_uuid = str(uuid.uuid4()) def dataReceived(self, data): self.resetTimeout() ip, source_port = self.transport.client # check blacklisted_ip - if redis_server.sismember('blacklisted_ip', ip): + if redis_server.sismember('blacklist_ip', ip): self.transport.abortConnection() #print(ip) #print(source_port) @@ -54,6 +62,10 @@ class Echo(Protocol, TimeoutMixin): self.buffer = b'' #self.transport.abortConnection() + def connectionLost(self, reason): + #print("Done") + redis_server.sadd('ended_session', self.session_uuid) + def unpack_header(self, data): data_header = {} if len(data) >= header_size: @@ -64,18 +76,26 @@ class Echo(Protocol, TimeoutMixin): data_header['hmac_header'] = data[26:58] data_header['size'] = struct.unpack('I', data[58:62])[0] + # uuid blacklist + if redis_server.sismember('blacklist_uuid', data_header['uuid_header']): + self.transport.abortConnection() + + # check default size limit + if data_header['size'] > data_default_size_limit: + self.transport.abortConnection() + return data_header def is_valid_uuid_v4(self, header_uuid): try: - uuid_test = UUID(hex=header_uuid, version=4) + uuid_test = uuid.UUID(hex=header_uuid, version=4) return uuid_test.hex == header_uuid except: return False # # TODO: check timestamp - def is_valid_header(self, uuid): - if self.is_valid_uuid_v4(uuid): + def is_valid_header(self, uuid_to_check): + if self.is_valid_uuid_v4(uuid_to_check): return True else: return False @@ -158,13 +178,13 @@ class Echo(Protocol, TimeoutMixin): #print('timestamp: {}'.format( data_header['timestamp'] )) #print('hmac: {}'.format( data_header['hmac_header'] )) #print('size: {}'.format( data_header['size'] )) - #print(d4_header) ### ### if data_header['hmac_header'] == HMAC.hexdigest(): #print('hmac match') date = datetime.datetime.now().strftime("%Y%m%d") - redis_server.xadd('stream:{}'.format(data_header['type']), {'message': data[header_size:], 'uuid': data_header['uuid_header'], 'timestamp': data_header['timestamp'], 'version': data_header['version']}) + redis_server.xadd('stream:{}:{}'.format(data_header['type'], self.session_uuid), {'message': data[header_size:], 'uuid': data_header['uuid_header'], 'timestamp': data_header['timestamp'], 'version': data_header['version']}) + redis_server.sadd('session_uuid:{}'.format(data_header['type']), self.session_uuid.encode()) redis_server.sadd('daily_uuid:{}'.format(date), data_header['uuid_header']) redis_server.zincrby('stat_uuid_ip:{}:{}'.format(date, data_header['uuid_header']), 1, ip) redis_server.sadd('daily_ip:{}'.format(date), ip) diff --git a/server/workers/workers_2/worker.py b/server/workers/workers_2/worker.py new file mode 100755 index 0000000..d03b8ff --- /dev/null +++ b/server/workers/workers_2/worker.py @@ -0,0 +1,101 @@ +#!/usr/bin/env python3 + +import os +import sys +import time +import redis +import subprocess + +import datetime + +def data_incorrect_format(session_uuid): + print('Incorrect format') + sys.exit(1) + +redis_server = redis.StrictRedis( + host="localhost", + port=6379, + db=0) + +type = 1 +tcp_dump_cycle = '5' + +if __name__ == "__main__": + + if len(sys.argv) != 2: + print('usage:', 'Worker.py', 'session_uuid') + exit(1) + + session_uuid = sys.argv[1] + stream_name = 'stream:{}:{}'.format(type, session_uuid) + consumer_name = 'consumer:{}:{}'.format(type, session_uuid) + group_name = 'workers:{}:{}'.format(type, session_uuid) + id = '0' + + res = redis_server.xread({stream_name: id}, count=1) + #print(res) + if res: + uuid = res[0][1][0][1][b'uuid'].decode() + else: + sys.exit(1) + print('Incorrect message') + redis_server.sadd('working_session_uuid:{}'.format(type), session_uuid) + + #LAUNCH a tcpdump + #process = subprocess.Popen(["tcpdump", '-n', '-r', '-', '-G', '5', '-w', '{}/%Y/%m/%d/%H%M%S.cap'.format(uuid)], stdin=subprocess.PIPE) + process = subprocess.Popen(["tcpdump", '-n', '-r', '-', '-G', tcp_dump_cycle, '-w', '{}-%Y%m%d%H%M%S.cap'.format(uuid)], stdin=subprocess.PIPE, stderr=subprocess.PIPE) + #redis_server.xgroup_create('stream:{}:{}'.format(type, session_uuid), 'workers:{}:{}'.format(type, session_uuid)) + + while True: + #print(redis_server.xpending(stream_name, group_name)) + #redis_server.sadd('working_session_uuid:{}'.format(type), session_uuid) + + #res = redis_server.xreadgroup(group_name, consumer_name, {stream_name: '1547198181015-0'}, count=1) + res = redis_server.xread({stream_name: id}, count=1) + #print(res) + if res: + new_id = res[0][1][0][0].decode() + if id != new_id: + id = new_id + data = res[0][1][0][1] + if id and data: + #print(id) + #print(data) + + #print(data[b'message']) + try: + process.stdin.write(data[b'message']) + except: + Error_message = process.stderr.read() + if Error_message == b'tcpdump: unknown file format\n': + data_incorrect_format(session_uuid) + + #print(process.stdout.read()) + + #redis_server.xack(stream_name, group_name, id) + #redis_server.xdel(stream_name, id) + + else: + # sucess, all data are saved + if redis_server.sismember('ended_session', session_uuid): + out, err = process.communicate(timeout= 0.5) + #print(out) + if err == b'tcpdump: unknown file format\n': + data_incorrect_format(session_uuid) + else: + print(err) + + + + #print(process.stderr.read()) + #redis_server.srem('ended_session', session_uuid) + #redis_server.srem('session_uuid:{}'.format(type), session_uuid) + #redis_server.srem('working_session_uuid:{}'.format(type), session_uuid) + #redis_server.delete(stream_name) + # make sure that tcpdump can save all datas + print('DONE') + time.sleep(int(tcp_dump_cycle) + 1) + print('Exit') + sys.exit(0) + else: + time.sleep(10) diff --git a/server/workers/workers_2/workers_manager.py b/server/workers/workers_2/workers_manager.py new file mode 100755 index 0000000..95d16f7 --- /dev/null +++ b/server/workers/workers_2/workers_manager.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python3 + +import os +import sys +import time +import redis +import subprocess + +redis_server = redis.StrictRedis( + host="localhost", + port=6379, + db=0) +type = 1 + +try: + redis_server.ping() +except redis.exceptions.ConnectionError: + print('Error: Redis server {}:{}, ConnectionError'.format(host_redis, port_redis)) + sys.exit(1) + +if __name__ == "__main__": + stream_name = 'stream:{}'.format(type) + redis_server.delete('working_session_uuid:{}'.format(type)) + + while True: + for session_uuid in redis_server.smembers('session_uuid:{}'.format(type)): + session_uuid = session_uuid.decode() + if not redis_server.sismember('working_session_uuid:{}'.format(type), session_uuid): + + #try: + # redis_server.xgroup_create('stream:{}:{}'.format(type, session_uuid), 'workers:{}:{}'.format(type, session_uuid)) + #xcept redis.exceptions.ResponseError: + # pass + + process = subprocess.Popen(['./worker.py', session_uuid]) + print('New worker launched: {}'.format(session_uuid)) + + + print('sleeping(10)') + time.sleep(10)