mirror of https://github.com/D4-project/d4-core
chg: [server] chg tcpdump filepath + add basic logging
parent
b82786e911
commit
a7d5348ca6
|
@ -9,6 +9,9 @@ import redis
|
|||
import struct
|
||||
import time
|
||||
import datetime
|
||||
import argparse
|
||||
import logging
|
||||
import logging.handlers
|
||||
|
||||
from twisted.internet import ssl, task, protocol, endpoints, defer
|
||||
from twisted.python import log
|
||||
|
@ -18,7 +21,7 @@ from twisted.internet.protocol import Protocol
|
|||
from twisted.protocols.policies import TimeoutMixin
|
||||
|
||||
hmac_reset = bytearray(32)
|
||||
hmac_key = b'private key to change'
|
||||
hmac_key = b'private key to change\n'
|
||||
|
||||
timeout_time = 30
|
||||
|
||||
|
@ -45,6 +48,7 @@ class Echo(Protocol, TimeoutMixin):
|
|||
self.buffer = b''
|
||||
self.setTimeout(timeout_time)
|
||||
self.session_uuid = str(uuid.uuid4())
|
||||
self.data_saved = False
|
||||
|
||||
def dataReceived(self, data):
|
||||
self.resetTimeout()
|
||||
|
@ -52,18 +56,15 @@ class Echo(Protocol, TimeoutMixin):
|
|||
# check blacklisted_ip
|
||||
if redis_server.sismember('blacklist_ip', ip):
|
||||
self.transport.abortConnection()
|
||||
#print(ip)
|
||||
#print(source_port)
|
||||
|
||||
self.process_header(data, ip, source_port)
|
||||
|
||||
def timeoutConnection(self):
|
||||
#print('timeout')
|
||||
self.resetTimeout()
|
||||
self.buffer = b''
|
||||
#self.transport.abortConnection()
|
||||
|
||||
def connectionLost(self, reason):
|
||||
#print("Done")
|
||||
redis_server.sadd('ended_session', self.session_uuid)
|
||||
|
||||
def unpack_header(self, data):
|
||||
|
@ -134,7 +135,7 @@ class Echo(Protocol, TimeoutMixin):
|
|||
print('discard data')
|
||||
print(data_header)
|
||||
print(data)
|
||||
time.sleep(5)
|
||||
#time.sleep(5)
|
||||
#sys.exit(1)
|
||||
else:
|
||||
if len(data) < header_size:
|
||||
|
@ -143,7 +144,7 @@ class Echo(Protocol, TimeoutMixin):
|
|||
print('error discard data')
|
||||
print(data_header)
|
||||
print(data)
|
||||
time.sleep(5)
|
||||
#time.sleep(5)
|
||||
#sys.exit(1)
|
||||
|
||||
# not a header
|
||||
|
@ -151,8 +152,8 @@ class Echo(Protocol, TimeoutMixin):
|
|||
# add previous data
|
||||
if len(data) < header_size:
|
||||
self.buffer += data
|
||||
print(self.buffer)
|
||||
print(len(self.buffer))
|
||||
#print(self.buffer)
|
||||
#print(len(self.buffer))
|
||||
#todo check if valid header before adding ?
|
||||
else:
|
||||
data = self.buffer + data
|
||||
|
@ -180,17 +181,20 @@ class Echo(Protocol, TimeoutMixin):
|
|||
#print('size: {}'.format( data_header['size'] ))
|
||||
### ###
|
||||
|
||||
# hmac match
|
||||
if data_header['hmac_header'] == HMAC.hexdigest():
|
||||
#print('hmac match')
|
||||
date = datetime.datetime.now().strftime("%Y%m%d")
|
||||
redis_server.xadd('stream:{}:{}'.format(data_header['type'], self.session_uuid), {'message': data[header_size:], 'uuid': data_header['uuid_header'], 'timestamp': data_header['timestamp'], 'version': data_header['version']})
|
||||
redis_server.sadd('session_uuid:{}'.format(data_header['type']), self.session_uuid.encode())
|
||||
redis_server.sadd('daily_uuid:{}'.format(date), data_header['uuid_header'])
|
||||
redis_server.zincrby('stat_uuid_ip:{}:{}'.format(date, data_header['uuid_header']), 1, ip)
|
||||
redis_server.sadd('daily_ip:{}'.format(date), ip)
|
||||
redis_server.zincrby('stat_ip_uuid:{}:{}'.format(date, ip), 1, data_header['uuid_header'])
|
||||
#with open(data_header['uuid_header'], 'ab') as f:
|
||||
# f.write(data[header_size:])
|
||||
|
||||
redis_server.sadd('daily_uuid:{}'.format(date), data_header['uuid_header'])
|
||||
redis_server.sadd('daily_ip:{}'.format(date), ip)
|
||||
|
||||
if not self.data_saved:
|
||||
redis_server.sadd('session_uuid:{}'.format(data_header['type']), self.session_uuid.encode())
|
||||
redis_server.hset('map-type:session_uuid-uuid:{}'.format(data_header['type']), self.session_uuid, data_header['uuid_header'])
|
||||
self.data_saved = True
|
||||
else:
|
||||
print('hmac do not match')
|
||||
print(data)
|
||||
|
@ -212,4 +216,21 @@ def main(reactor):
|
|||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('-v', '--verbose',help='dddd' , type=int, default=30)
|
||||
args = parser.parse_args()
|
||||
print(args.verbose)
|
||||
|
||||
log_filename = 'd4-server-logs.log'
|
||||
logger = logging.getLogger()
|
||||
#formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||||
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
|
||||
handler_log = logging.handlers.TimedRotatingFileHandler(log_filename, when="midnight", interval=1)
|
||||
handler_log.suffix = '%Y-%m-%d-{}'.format(log_filename)
|
||||
handler_log.setFormatter(formatter)
|
||||
logger.addHandler(handler_log)
|
||||
logger.setLevel(args.verbose)
|
||||
|
||||
logger.error('test')
|
||||
|
||||
task.react(main)
|
||||
|
|
|
@ -33,24 +33,26 @@ if __name__ == "__main__":
|
|||
id = '0'
|
||||
|
||||
res = redis_server.xread({stream_name: id}, count=1)
|
||||
#print(res)
|
||||
if res:
|
||||
uuid = res[0][1][0][1][b'uuid'].decode()
|
||||
date = datetime.datetime.now().strftime("%Y%m%d")
|
||||
tcpdump_path = os.path.join('../../data', uuid, str(type))
|
||||
rel_path = os.path.join(tcpdump_path, date[0:4], date[4:6], date[6:8])
|
||||
if not os.path.isdir(rel_path):
|
||||
os.makedirs(rel_path)
|
||||
else:
|
||||
sys.exit(1)
|
||||
print('Incorrect message')
|
||||
redis_server.sadd('working_session_uuid:{}'.format(type), session_uuid)
|
||||
|
||||
#LAUNCH a tcpdump
|
||||
#process = subprocess.Popen(["tcpdump", '-n', '-r', '-', '-G', '5', '-w', '{}/%Y/%m/%d/%H%M%S.cap'.format(uuid)], stdin=subprocess.PIPE)
|
||||
process = subprocess.Popen(["tcpdump", '-n', '-r', '-', '-G', tcp_dump_cycle, '-w', '{}-%Y%m%d%H%M%S.cap'.format(uuid)], stdin=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
process = subprocess.Popen(["tcpdump", '-n', '-r', '-', '-G', tcp_dump_cycle, '-w', '{}/%Y/%m/%d/{}-%Y-%m-%d-%H%M%S.cap'.format(tcpdump_path, uuid)], stdin=subprocess.PIPE)
|
||||
#redis_server.xgroup_create('stream:{}:{}'.format(type, session_uuid), 'workers:{}:{}'.format(type, session_uuid))
|
||||
|
||||
while True:
|
||||
#print(redis_server.xpending(stream_name, group_name))
|
||||
#redis_server.sadd('working_session_uuid:{}'.format(type), session_uuid)
|
||||
|
||||
#res = redis_server.xreadgroup(group_name, consumer_name, {stream_name: '1547198181015-0'}, count=1)
|
||||
res = redis_server.xread({stream_name: id}, count=1)
|
||||
#print(res)
|
||||
if res:
|
||||
|
@ -61,8 +63,14 @@ if __name__ == "__main__":
|
|||
if id and data:
|
||||
#print(id)
|
||||
#print(data)
|
||||
new_date = datetime.datetime.now().strftime("%Y%m%d")
|
||||
if new_date != date:
|
||||
print('rrr')
|
||||
date= new_date
|
||||
rel_path = os.path.join(tcpdump_path, date[0:4], date[4:6], date[6:8])
|
||||
if not os.path.isdir(rel_path):
|
||||
os.makedirs(rel_path)
|
||||
|
||||
#print(data[b'message'])
|
||||
try:
|
||||
process.stdin.write(data[b'message'])
|
||||
except:
|
||||
|
@ -72,9 +80,6 @@ if __name__ == "__main__":
|
|||
|
||||
#print(process.stdout.read())
|
||||
|
||||
#redis_server.xack(stream_name, group_name, id)
|
||||
#redis_server.xdel(stream_name, id)
|
||||
|
||||
else:
|
||||
# sucess, all data are saved
|
||||
if redis_server.sismember('ended_session', session_uuid):
|
||||
|
@ -82,20 +87,18 @@ if __name__ == "__main__":
|
|||
#print(out)
|
||||
if err == b'tcpdump: unknown file format\n':
|
||||
data_incorrect_format(session_uuid)
|
||||
else:
|
||||
elif err:
|
||||
print(err)
|
||||
|
||||
|
||||
|
||||
#print(process.stderr.read())
|
||||
#redis_server.srem('ended_session', session_uuid)
|
||||
#redis_server.srem('session_uuid:{}'.format(type), session_uuid)
|
||||
#redis_server.srem('working_session_uuid:{}'.format(type), session_uuid)
|
||||
#redis_server.delete(stream_name)
|
||||
redis_server.srem('ended_session', session_uuid)
|
||||
redis_server.srem('session_uuid:{}'.format(type), session_uuid)
|
||||
redis_server.srem('working_session_uuid:{}'.format(type), session_uuid)
|
||||
redis_server.hdel('map-type:session_uuid-uuid:{}'.format(type), session_uuid)
|
||||
redis_server.delete(stream_name)
|
||||
# make sure that tcpdump can save all datas
|
||||
print('DONE')
|
||||
time.sleep(int(tcp_dump_cycle) + 1)
|
||||
print('Exit')
|
||||
print('tcpdump: {} Done'.format(session_uuid))
|
||||
sys.exit(0)
|
||||
else:
|
||||
time.sleep(10)
|
|
@ -27,14 +27,9 @@ if __name__ == "__main__":
|
|||
session_uuid = session_uuid.decode()
|
||||
if not redis_server.sismember('working_session_uuid:{}'.format(type), session_uuid):
|
||||
|
||||
#try:
|
||||
# redis_server.xgroup_create('stream:{}:{}'.format(type, session_uuid), 'workers:{}:{}'.format(type, session_uuid))
|
||||
#xcept redis.exceptions.ResponseError:
|
||||
# pass
|
||||
|
||||
process = subprocess.Popen(['./worker.py', session_uuid])
|
||||
print('New worker launched: {}'.format(session_uuid))
|
||||
|
||||
|
||||
print('sleeping(10)')
|
||||
#print('.')
|
||||
time.sleep(10)
|
Loading…
Reference in New Issue