mirror of https://github.com/D4-project/d4-core
chg: [server] refractor stream worker + add uuid blacklist + handle pcap (type 2)
parent
6472ba8a21
commit
b66444540c
|
@ -2,6 +2,7 @@
|
||||||
|
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
|
import uuid
|
||||||
import hmac
|
import hmac
|
||||||
import stat
|
import stat
|
||||||
import redis
|
import redis
|
||||||
|
@ -16,33 +17,40 @@ from twisted.python.modules import getModule
|
||||||
from twisted.internet.protocol import Protocol
|
from twisted.internet.protocol import Protocol
|
||||||
from twisted.protocols.policies import TimeoutMixin
|
from twisted.protocols.policies import TimeoutMixin
|
||||||
|
|
||||||
|
|
||||||
from ctypes import *
|
|
||||||
from uuid import UUID
|
|
||||||
|
|
||||||
hmac_reset = bytearray(32)
|
hmac_reset = bytearray(32)
|
||||||
hmac_key = b'private key to change\n'
|
hmac_key = b'private key to change'
|
||||||
|
|
||||||
timeout_time = 30
|
timeout_time = 30
|
||||||
|
|
||||||
header_size = 62
|
header_size = 62
|
||||||
|
|
||||||
|
data_default_size_limit = 100000
|
||||||
|
|
||||||
|
host_redis="localhost"
|
||||||
|
port_redis=6379
|
||||||
redis_server = redis.StrictRedis(
|
redis_server = redis.StrictRedis(
|
||||||
host="localhost",
|
host=host_redis,
|
||||||
port=6379,
|
port=port_redis,
|
||||||
db=0)
|
db=0)
|
||||||
|
|
||||||
|
try:
|
||||||
|
redis_server.ping()
|
||||||
|
except redis.exceptions.ConnectionError:
|
||||||
|
print('Error: Redis server {}:{}, ConnectionError'.format(host_redis, port_redis))
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
class Echo(Protocol, TimeoutMixin):
|
class Echo(Protocol, TimeoutMixin):
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.buffer = b''
|
self.buffer = b''
|
||||||
self.setTimeout(timeout_time)
|
self.setTimeout(timeout_time)
|
||||||
|
self.session_uuid = str(uuid.uuid4())
|
||||||
|
|
||||||
def dataReceived(self, data):
|
def dataReceived(self, data):
|
||||||
self.resetTimeout()
|
self.resetTimeout()
|
||||||
ip, source_port = self.transport.client
|
ip, source_port = self.transport.client
|
||||||
# check blacklisted_ip
|
# check blacklisted_ip
|
||||||
if redis_server.sismember('blacklisted_ip', ip):
|
if redis_server.sismember('blacklist_ip', ip):
|
||||||
self.transport.abortConnection()
|
self.transport.abortConnection()
|
||||||
#print(ip)
|
#print(ip)
|
||||||
#print(source_port)
|
#print(source_port)
|
||||||
|
@ -54,6 +62,10 @@ class Echo(Protocol, TimeoutMixin):
|
||||||
self.buffer = b''
|
self.buffer = b''
|
||||||
#self.transport.abortConnection()
|
#self.transport.abortConnection()
|
||||||
|
|
||||||
|
def connectionLost(self, reason):
|
||||||
|
#print("Done")
|
||||||
|
redis_server.sadd('ended_session', self.session_uuid)
|
||||||
|
|
||||||
def unpack_header(self, data):
|
def unpack_header(self, data):
|
||||||
data_header = {}
|
data_header = {}
|
||||||
if len(data) >= header_size:
|
if len(data) >= header_size:
|
||||||
|
@ -64,18 +76,26 @@ class Echo(Protocol, TimeoutMixin):
|
||||||
data_header['hmac_header'] = data[26:58]
|
data_header['hmac_header'] = data[26:58]
|
||||||
data_header['size'] = struct.unpack('I', data[58:62])[0]
|
data_header['size'] = struct.unpack('I', data[58:62])[0]
|
||||||
|
|
||||||
|
# uuid blacklist
|
||||||
|
if redis_server.sismember('blacklist_uuid', data_header['uuid_header']):
|
||||||
|
self.transport.abortConnection()
|
||||||
|
|
||||||
|
# check default size limit
|
||||||
|
if data_header['size'] > data_default_size_limit:
|
||||||
|
self.transport.abortConnection()
|
||||||
|
|
||||||
return data_header
|
return data_header
|
||||||
|
|
||||||
def is_valid_uuid_v4(self, header_uuid):
|
def is_valid_uuid_v4(self, header_uuid):
|
||||||
try:
|
try:
|
||||||
uuid_test = UUID(hex=header_uuid, version=4)
|
uuid_test = uuid.UUID(hex=header_uuid, version=4)
|
||||||
return uuid_test.hex == header_uuid
|
return uuid_test.hex == header_uuid
|
||||||
except:
|
except:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# # TODO: check timestamp
|
# # TODO: check timestamp
|
||||||
def is_valid_header(self, uuid):
|
def is_valid_header(self, uuid_to_check):
|
||||||
if self.is_valid_uuid_v4(uuid):
|
if self.is_valid_uuid_v4(uuid_to_check):
|
||||||
return True
|
return True
|
||||||
else:
|
else:
|
||||||
return False
|
return False
|
||||||
|
@ -158,13 +178,13 @@ class Echo(Protocol, TimeoutMixin):
|
||||||
#print('timestamp: {}'.format( data_header['timestamp'] ))
|
#print('timestamp: {}'.format( data_header['timestamp'] ))
|
||||||
#print('hmac: {}'.format( data_header['hmac_header'] ))
|
#print('hmac: {}'.format( data_header['hmac_header'] ))
|
||||||
#print('size: {}'.format( data_header['size'] ))
|
#print('size: {}'.format( data_header['size'] ))
|
||||||
#print(d4_header)
|
|
||||||
### ###
|
### ###
|
||||||
|
|
||||||
if data_header['hmac_header'] == HMAC.hexdigest():
|
if data_header['hmac_header'] == HMAC.hexdigest():
|
||||||
#print('hmac match')
|
#print('hmac match')
|
||||||
date = datetime.datetime.now().strftime("%Y%m%d")
|
date = datetime.datetime.now().strftime("%Y%m%d")
|
||||||
redis_server.xadd('stream:{}'.format(data_header['type']), {'message': data[header_size:], 'uuid': data_header['uuid_header'], 'timestamp': data_header['timestamp'], 'version': data_header['version']})
|
redis_server.xadd('stream:{}:{}'.format(data_header['type'], self.session_uuid), {'message': data[header_size:], 'uuid': data_header['uuid_header'], 'timestamp': data_header['timestamp'], 'version': data_header['version']})
|
||||||
|
redis_server.sadd('session_uuid:{}'.format(data_header['type']), self.session_uuid.encode())
|
||||||
redis_server.sadd('daily_uuid:{}'.format(date), data_header['uuid_header'])
|
redis_server.sadd('daily_uuid:{}'.format(date), data_header['uuid_header'])
|
||||||
redis_server.zincrby('stat_uuid_ip:{}:{}'.format(date, data_header['uuid_header']), 1, ip)
|
redis_server.zincrby('stat_uuid_ip:{}:{}'.format(date, data_header['uuid_header']), 1, ip)
|
||||||
redis_server.sadd('daily_ip:{}'.format(date), ip)
|
redis_server.sadd('daily_ip:{}'.format(date), ip)
|
||||||
|
|
|
@ -0,0 +1,101 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import redis
|
||||||
|
import subprocess
|
||||||
|
|
||||||
|
import datetime
|
||||||
|
|
||||||
|
def data_incorrect_format(session_uuid):
|
||||||
|
print('Incorrect format')
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
redis_server = redis.StrictRedis(
|
||||||
|
host="localhost",
|
||||||
|
port=6379,
|
||||||
|
db=0)
|
||||||
|
|
||||||
|
type = 1
|
||||||
|
tcp_dump_cycle = '5'
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
|
||||||
|
if len(sys.argv) != 2:
|
||||||
|
print('usage:', 'Worker.py', 'session_uuid')
|
||||||
|
exit(1)
|
||||||
|
|
||||||
|
session_uuid = sys.argv[1]
|
||||||
|
stream_name = 'stream:{}:{}'.format(type, session_uuid)
|
||||||
|
consumer_name = 'consumer:{}:{}'.format(type, session_uuid)
|
||||||
|
group_name = 'workers:{}:{}'.format(type, session_uuid)
|
||||||
|
id = '0'
|
||||||
|
|
||||||
|
res = redis_server.xread({stream_name: id}, count=1)
|
||||||
|
#print(res)
|
||||||
|
if res:
|
||||||
|
uuid = res[0][1][0][1][b'uuid'].decode()
|
||||||
|
else:
|
||||||
|
sys.exit(1)
|
||||||
|
print('Incorrect message')
|
||||||
|
redis_server.sadd('working_session_uuid:{}'.format(type), session_uuid)
|
||||||
|
|
||||||
|
#LAUNCH a tcpdump
|
||||||
|
#process = subprocess.Popen(["tcpdump", '-n', '-r', '-', '-G', '5', '-w', '{}/%Y/%m/%d/%H%M%S.cap'.format(uuid)], stdin=subprocess.PIPE)
|
||||||
|
process = subprocess.Popen(["tcpdump", '-n', '-r', '-', '-G', tcp_dump_cycle, '-w', '{}-%Y%m%d%H%M%S.cap'.format(uuid)], stdin=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||||
|
#redis_server.xgroup_create('stream:{}:{}'.format(type, session_uuid), 'workers:{}:{}'.format(type, session_uuid))
|
||||||
|
|
||||||
|
while True:
|
||||||
|
#print(redis_server.xpending(stream_name, group_name))
|
||||||
|
#redis_server.sadd('working_session_uuid:{}'.format(type), session_uuid)
|
||||||
|
|
||||||
|
#res = redis_server.xreadgroup(group_name, consumer_name, {stream_name: '1547198181015-0'}, count=1)
|
||||||
|
res = redis_server.xread({stream_name: id}, count=1)
|
||||||
|
#print(res)
|
||||||
|
if res:
|
||||||
|
new_id = res[0][1][0][0].decode()
|
||||||
|
if id != new_id:
|
||||||
|
id = new_id
|
||||||
|
data = res[0][1][0][1]
|
||||||
|
if id and data:
|
||||||
|
#print(id)
|
||||||
|
#print(data)
|
||||||
|
|
||||||
|
#print(data[b'message'])
|
||||||
|
try:
|
||||||
|
process.stdin.write(data[b'message'])
|
||||||
|
except:
|
||||||
|
Error_message = process.stderr.read()
|
||||||
|
if Error_message == b'tcpdump: unknown file format\n':
|
||||||
|
data_incorrect_format(session_uuid)
|
||||||
|
|
||||||
|
#print(process.stdout.read())
|
||||||
|
|
||||||
|
#redis_server.xack(stream_name, group_name, id)
|
||||||
|
#redis_server.xdel(stream_name, id)
|
||||||
|
|
||||||
|
else:
|
||||||
|
# sucess, all data are saved
|
||||||
|
if redis_server.sismember('ended_session', session_uuid):
|
||||||
|
out, err = process.communicate(timeout= 0.5)
|
||||||
|
#print(out)
|
||||||
|
if err == b'tcpdump: unknown file format\n':
|
||||||
|
data_incorrect_format(session_uuid)
|
||||||
|
else:
|
||||||
|
print(err)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
#print(process.stderr.read())
|
||||||
|
#redis_server.srem('ended_session', session_uuid)
|
||||||
|
#redis_server.srem('session_uuid:{}'.format(type), session_uuid)
|
||||||
|
#redis_server.srem('working_session_uuid:{}'.format(type), session_uuid)
|
||||||
|
#redis_server.delete(stream_name)
|
||||||
|
# make sure that tcpdump can save all datas
|
||||||
|
print('DONE')
|
||||||
|
time.sleep(int(tcp_dump_cycle) + 1)
|
||||||
|
print('Exit')
|
||||||
|
sys.exit(0)
|
||||||
|
else:
|
||||||
|
time.sleep(10)
|
|
@ -0,0 +1,40 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import redis
|
||||||
|
import subprocess
|
||||||
|
|
||||||
|
redis_server = redis.StrictRedis(
|
||||||
|
host="localhost",
|
||||||
|
port=6379,
|
||||||
|
db=0)
|
||||||
|
type = 1
|
||||||
|
|
||||||
|
try:
|
||||||
|
redis_server.ping()
|
||||||
|
except redis.exceptions.ConnectionError:
|
||||||
|
print('Error: Redis server {}:{}, ConnectionError'.format(host_redis, port_redis))
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
stream_name = 'stream:{}'.format(type)
|
||||||
|
redis_server.delete('working_session_uuid:{}'.format(type))
|
||||||
|
|
||||||
|
while True:
|
||||||
|
for session_uuid in redis_server.smembers('session_uuid:{}'.format(type)):
|
||||||
|
session_uuid = session_uuid.decode()
|
||||||
|
if not redis_server.sismember('working_session_uuid:{}'.format(type), session_uuid):
|
||||||
|
|
||||||
|
#try:
|
||||||
|
# redis_server.xgroup_create('stream:{}:{}'.format(type, session_uuid), 'workers:{}:{}'.format(type, session_uuid))
|
||||||
|
#xcept redis.exceptions.ResponseError:
|
||||||
|
# pass
|
||||||
|
|
||||||
|
process = subprocess.Popen(['./worker.py', session_uuid])
|
||||||
|
print('New worker launched: {}'.format(session_uuid))
|
||||||
|
|
||||||
|
|
||||||
|
print('sleeping(10)')
|
||||||
|
time.sleep(10)
|
Loading…
Reference in New Issue