diff --git a/server/server.py b/server/server.py index 979986a..207fccf 100755 --- a/server/server.py +++ b/server/server.py @@ -9,6 +9,9 @@ import redis import struct import time import datetime +import argparse +import logging +import logging.handlers from twisted.internet import ssl, task, protocol, endpoints, defer from twisted.python import log @@ -18,7 +21,7 @@ from twisted.internet.protocol import Protocol from twisted.protocols.policies import TimeoutMixin hmac_reset = bytearray(32) -hmac_key = b'private key to change' +hmac_key = b'private key to change\n' timeout_time = 30 @@ -45,6 +48,7 @@ class Echo(Protocol, TimeoutMixin): self.buffer = b'' self.setTimeout(timeout_time) self.session_uuid = str(uuid.uuid4()) + self.data_saved = False def dataReceived(self, data): self.resetTimeout() @@ -52,18 +56,15 @@ class Echo(Protocol, TimeoutMixin): # check blacklisted_ip if redis_server.sismember('blacklist_ip', ip): self.transport.abortConnection() - #print(ip) - #print(source_port) + self.process_header(data, ip, source_port) def timeoutConnection(self): - #print('timeout') self.resetTimeout() self.buffer = b'' #self.transport.abortConnection() def connectionLost(self, reason): - #print("Done") redis_server.sadd('ended_session', self.session_uuid) def unpack_header(self, data): @@ -134,7 +135,7 @@ class Echo(Protocol, TimeoutMixin): print('discard data') print(data_header) print(data) - time.sleep(5) + #time.sleep(5) #sys.exit(1) else: if len(data) < header_size: @@ -143,7 +144,7 @@ class Echo(Protocol, TimeoutMixin): print('error discard data') print(data_header) print(data) - time.sleep(5) + #time.sleep(5) #sys.exit(1) # not a header @@ -151,8 +152,8 @@ class Echo(Protocol, TimeoutMixin): # add previous data if len(data) < header_size: self.buffer += data - print(self.buffer) - print(len(self.buffer)) + #print(self.buffer) + #print(len(self.buffer)) #todo check if valid header before adding ? else: data = self.buffer + data @@ -180,17 +181,20 @@ class Echo(Protocol, TimeoutMixin): #print('size: {}'.format( data_header['size'] )) ### ### + # hmac match if data_header['hmac_header'] == HMAC.hexdigest(): - #print('hmac match') date = datetime.datetime.now().strftime("%Y%m%d") redis_server.xadd('stream:{}:{}'.format(data_header['type'], self.session_uuid), {'message': data[header_size:], 'uuid': data_header['uuid_header'], 'timestamp': data_header['timestamp'], 'version': data_header['version']}) - redis_server.sadd('session_uuid:{}'.format(data_header['type']), self.session_uuid.encode()) - redis_server.sadd('daily_uuid:{}'.format(date), data_header['uuid_header']) redis_server.zincrby('stat_uuid_ip:{}:{}'.format(date, data_header['uuid_header']), 1, ip) - redis_server.sadd('daily_ip:{}'.format(date), ip) redis_server.zincrby('stat_ip_uuid:{}:{}'.format(date, ip), 1, data_header['uuid_header']) - #with open(data_header['uuid_header'], 'ab') as f: - # f.write(data[header_size:]) + + redis_server.sadd('daily_uuid:{}'.format(date), data_header['uuid_header']) + redis_server.sadd('daily_ip:{}'.format(date), ip) + + if not self.data_saved: + redis_server.sadd('session_uuid:{}'.format(data_header['type']), self.session_uuid.encode()) + redis_server.hset('map-type:session_uuid-uuid:{}'.format(data_header['type']), self.session_uuid, data_header['uuid_header']) + self.data_saved = True else: print('hmac do not match') print(data) @@ -212,4 +216,21 @@ def main(reactor): if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument('-v', '--verbose',help='dddd' , type=int, default=30) + args = parser.parse_args() + print(args.verbose) + + log_filename = 'd4-server-logs.log' + logger = logging.getLogger() + #formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + handler_log = logging.handlers.TimedRotatingFileHandler(log_filename, when="midnight", interval=1) + handler_log.suffix = '%Y-%m-%d-{}'.format(log_filename) + handler_log.setFormatter(formatter) + logger.addHandler(handler_log) + logger.setLevel(args.verbose) + + logger.error('test') + task.react(main) diff --git a/server/workers/workers_2/worker.py b/server/workers/workers_1/worker.py similarity index 67% rename from server/workers/workers_2/worker.py rename to server/workers/workers_1/worker.py index d03b8ff..ae28f59 100755 --- a/server/workers/workers_2/worker.py +++ b/server/workers/workers_1/worker.py @@ -33,24 +33,26 @@ if __name__ == "__main__": id = '0' res = redis_server.xread({stream_name: id}, count=1) - #print(res) if res: uuid = res[0][1][0][1][b'uuid'].decode() + date = datetime.datetime.now().strftime("%Y%m%d") + tcpdump_path = os.path.join('../../data', uuid, str(type)) + rel_path = os.path.join(tcpdump_path, date[0:4], date[4:6], date[6:8]) + if not os.path.isdir(rel_path): + os.makedirs(rel_path) else: sys.exit(1) print('Incorrect message') redis_server.sadd('working_session_uuid:{}'.format(type), session_uuid) #LAUNCH a tcpdump - #process = subprocess.Popen(["tcpdump", '-n', '-r', '-', '-G', '5', '-w', '{}/%Y/%m/%d/%H%M%S.cap'.format(uuid)], stdin=subprocess.PIPE) - process = subprocess.Popen(["tcpdump", '-n', '-r', '-', '-G', tcp_dump_cycle, '-w', '{}-%Y%m%d%H%M%S.cap'.format(uuid)], stdin=subprocess.PIPE, stderr=subprocess.PIPE) + process = subprocess.Popen(["tcpdump", '-n', '-r', '-', '-G', tcp_dump_cycle, '-w', '{}/%Y/%m/%d/{}-%Y-%m-%d-%H%M%S.cap'.format(tcpdump_path, uuid)], stdin=subprocess.PIPE) #redis_server.xgroup_create('stream:{}:{}'.format(type, session_uuid), 'workers:{}:{}'.format(type, session_uuid)) while True: #print(redis_server.xpending(stream_name, group_name)) #redis_server.sadd('working_session_uuid:{}'.format(type), session_uuid) - #res = redis_server.xreadgroup(group_name, consumer_name, {stream_name: '1547198181015-0'}, count=1) res = redis_server.xread({stream_name: id}, count=1) #print(res) if res: @@ -61,8 +63,14 @@ if __name__ == "__main__": if id and data: #print(id) #print(data) + new_date = datetime.datetime.now().strftime("%Y%m%d") + if new_date != date: + print('rrr') + date= new_date + rel_path = os.path.join(tcpdump_path, date[0:4], date[4:6], date[6:8]) + if not os.path.isdir(rel_path): + os.makedirs(rel_path) - #print(data[b'message']) try: process.stdin.write(data[b'message']) except: @@ -72,9 +80,6 @@ if __name__ == "__main__": #print(process.stdout.read()) - #redis_server.xack(stream_name, group_name, id) - #redis_server.xdel(stream_name, id) - else: # sucess, all data are saved if redis_server.sismember('ended_session', session_uuid): @@ -82,20 +87,18 @@ if __name__ == "__main__": #print(out) if err == b'tcpdump: unknown file format\n': data_incorrect_format(session_uuid) - else: + elif err: print(err) - - #print(process.stderr.read()) - #redis_server.srem('ended_session', session_uuid) - #redis_server.srem('session_uuid:{}'.format(type), session_uuid) - #redis_server.srem('working_session_uuid:{}'.format(type), session_uuid) - #redis_server.delete(stream_name) + redis_server.srem('ended_session', session_uuid) + redis_server.srem('session_uuid:{}'.format(type), session_uuid) + redis_server.srem('working_session_uuid:{}'.format(type), session_uuid) + redis_server.hdel('map-type:session_uuid-uuid:{}'.format(type), session_uuid) + redis_server.delete(stream_name) # make sure that tcpdump can save all datas - print('DONE') time.sleep(int(tcp_dump_cycle) + 1) - print('Exit') + print('tcpdump: {} Done'.format(session_uuid)) sys.exit(0) else: time.sleep(10) diff --git a/server/workers/workers_2/workers_manager.py b/server/workers/workers_1/workers_manager.py similarity index 77% rename from server/workers/workers_2/workers_manager.py rename to server/workers/workers_1/workers_manager.py index 95d16f7..2b3b42f 100755 --- a/server/workers/workers_2/workers_manager.py +++ b/server/workers/workers_1/workers_manager.py @@ -27,14 +27,9 @@ if __name__ == "__main__": session_uuid = session_uuid.decode() if not redis_server.sismember('working_session_uuid:{}'.format(type), session_uuid): - #try: - # redis_server.xgroup_create('stream:{}:{}'.format(type, session_uuid), 'workers:{}:{}'.format(type, session_uuid)) - #xcept redis.exceptions.ResponseError: - # pass - process = subprocess.Popen(['./worker.py', session_uuid]) print('New worker launched: {}'.format(session_uuid)) - print('sleeping(10)') + #print('.') time.sleep(10)