mirror of https://github.com/D4-project/d4-core
chg: [server] fix
parent
653a1073b8
commit
58b2f6a19e
|
@ -74,7 +74,7 @@ function launching_workers {
|
||||||
|
|
||||||
screen -S "Workers_D4" -X screen -t "1_workers_manager" bash -c "cd ${D4_HOME}/workers/workers_1; ./workers_manager.py; read x"
|
screen -S "Workers_D4" -X screen -t "1_workers_manager" bash -c "cd ${D4_HOME}/workers/workers_1; ./workers_manager.py; read x"
|
||||||
sleep 0.1
|
sleep 0.1
|
||||||
screen -S "Workers_D4" -X screen -t "2_workers_manager" bash -c "cd ${D4_HOME}/workers/workers_4; ./workers_manager.py; read x"
|
screen -S "Workers_D4" -X screen -t "4_workers_manager" bash -c "cd ${D4_HOME}/workers/workers_4; ./workers_manager.py; read x"
|
||||||
sleep 0.1
|
sleep 0.1
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -266,7 +266,8 @@ while [ "$1" != "" ]; do
|
||||||
case $1 in
|
case $1 in
|
||||||
-l | --launchAuto ) launch_all;
|
-l | --launchAuto ) launch_all;
|
||||||
;;
|
;;
|
||||||
-k | --killAll ) killall;
|
-k | --killAll ) helptext;
|
||||||
|
killall;
|
||||||
;;
|
;;
|
||||||
-h | --help ) helptext;
|
-h | --help ) helptext;
|
||||||
exit
|
exit
|
||||||
|
|
|
@ -174,7 +174,7 @@ rdbcompression yes
|
||||||
rdbchecksum yes
|
rdbchecksum yes
|
||||||
|
|
||||||
# The filename where to dump the DB
|
# The filename where to dump the DB
|
||||||
dbfilename dump.rdb
|
dbfilename dump6379.rdb
|
||||||
|
|
||||||
# The working directory.
|
# The working directory.
|
||||||
#
|
#
|
||||||
|
|
|
@ -174,7 +174,7 @@ rdbcompression yes
|
||||||
rdbchecksum yes
|
rdbchecksum yes
|
||||||
|
|
||||||
# The filename where to dump the DB
|
# The filename where to dump the DB
|
||||||
dbfilename dump.rdb
|
dbfilename dump6880.rdb
|
||||||
|
|
||||||
# The working directory.
|
# The working directory.
|
||||||
#
|
#
|
||||||
|
|
|
@ -29,7 +29,7 @@ timeout_time = 30
|
||||||
|
|
||||||
header_size = 62
|
header_size = 62
|
||||||
|
|
||||||
data_default_size_limit = 100000
|
data_default_size_limit = 1000000
|
||||||
default_max_entries_by_stream = 10000
|
default_max_entries_by_stream = 10000
|
||||||
|
|
||||||
host_redis_stream = "localhost"
|
host_redis_stream = "localhost"
|
||||||
|
@ -113,7 +113,14 @@ class Echo(Protocol, TimeoutMixin):
|
||||||
# check default size limit
|
# check default size limit
|
||||||
if data_header['size'] > data_default_size_limit:
|
if data_header['size'] > data_default_size_limit:
|
||||||
self.transport.abortConnection()
|
self.transport.abortConnection()
|
||||||
logger.warning('Incorrect data size: the server received more data than expected by default, expected={}, received={} , uuid={}, session_uuid={}'.format(data_default_size_limit, data_header['size'] ,data_header['uuid_header'], self.session_uuid))
|
logger.warning('Incorrect header data size: the server received more data than expected by default, expected={}, received={} , uuid={}, session_uuid={}'.format(data_default_size_limit, data_header['size'] ,data_header['uuid_header'], self.session_uuid))
|
||||||
|
|
||||||
|
# Worker: Incorrect type
|
||||||
|
if redis_server_stream.sismember('Error:IncorrectType:{}'.format(data_header['type']), self.session_uuid):
|
||||||
|
self.transport.abortConnection()
|
||||||
|
redis_server_stream.delete(stream_name)
|
||||||
|
redis_server_stream.srem('Error:IncorrectType:{}'.format(data_header['type']), self.session_uuid)
|
||||||
|
logger.warning('Incorrect type={} detected by worker, uuid={}, session_uuid={}'.format(data_header['type'] ,data_header['uuid_header'], self.session_uuid))
|
||||||
|
|
||||||
return data_header
|
return data_header
|
||||||
|
|
||||||
|
|
|
@ -8,18 +8,36 @@ import subprocess
|
||||||
|
|
||||||
import datetime
|
import datetime
|
||||||
|
|
||||||
def data_incorrect_format(session_uuid):
|
def data_incorrect_format(stream_name, session_uuid, uuid):
|
||||||
|
redis_server_stream.sadd('Error:IncorrectType:{}'.format(type), session_uuid)
|
||||||
|
redis_server_metadata.hset('metadata_uuid:{}'.format(uuid), 'Error', 'Error: Type={}, Incorrect file format'.format(type))
|
||||||
|
clean_stream(stream_name, session_uuid)
|
||||||
print('Incorrect format')
|
print('Incorrect format')
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
|
def clean_stream(stream_name, session_uuid):
|
||||||
|
redis_server_stream.srem('ended_session', session_uuid)
|
||||||
|
redis_server_stream.srem('session_uuid:{}'.format(type), session_uuid)
|
||||||
|
redis_server_stream.srem('working_session_uuid:{}'.format(type), session_uuid)
|
||||||
|
redis_server_stream.hdel('map-type:session_uuid-uuid:{}'.format(type), session_uuid)
|
||||||
|
redis_server_stream.delete(stream_name)
|
||||||
|
|
||||||
host_redis_stream = "localhost"
|
host_redis_stream = "localhost"
|
||||||
port_redis_stream = 6379
|
port_redis_stream = 6379
|
||||||
|
|
||||||
|
host_redis_metadata = "localhost"
|
||||||
|
port_redis_metadata = 6380
|
||||||
|
|
||||||
redis_server_stream = redis.StrictRedis(
|
redis_server_stream = redis.StrictRedis(
|
||||||
host=host_redis_stream,
|
host=host_redis_stream,
|
||||||
port=port_redis_stream,
|
port=port_redis_stream,
|
||||||
db=0)
|
db=0)
|
||||||
|
|
||||||
|
redis_server_metadata = redis.StrictRedis(
|
||||||
|
host=host_redis_metadata,
|
||||||
|
port=port_redis_metadata,
|
||||||
|
db=0)
|
||||||
|
|
||||||
type = 1
|
type = 1
|
||||||
tcp_dump_cycle = '300'
|
tcp_dump_cycle = '300'
|
||||||
stream_buffer = 100
|
stream_buffer = 100
|
||||||
|
@ -51,7 +69,7 @@ if __name__ == "__main__":
|
||||||
redis_server_stream.sadd('working_session_uuid:{}'.format(type), session_uuid)
|
redis_server_stream.sadd('working_session_uuid:{}'.format(type), session_uuid)
|
||||||
|
|
||||||
#LAUNCH a tcpdump
|
#LAUNCH a tcpdump
|
||||||
process = subprocess.Popen(["tcpdump", '-n', '-r', '-', '-G', tcp_dump_cycle, '-w', '{}/%Y/%m/%d/{}-%Y-%m-%d-%H%M%S.cap'.format(tcpdump_path, uuid)], stdin=subprocess.PIPE)
|
process = subprocess.Popen(["tcpdump", '-n', '-r', '-', '-G', tcp_dump_cycle, '-w', '{}/%Y/%m/%d/{}-%Y-%m-%d-%H%M%S.cap'.format(tcpdump_path, uuid)], stdin=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||||
nb_save = 0
|
nb_save = 0
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
|
@ -78,7 +96,7 @@ if __name__ == "__main__":
|
||||||
except:
|
except:
|
||||||
Error_message = process.stderr.read()
|
Error_message = process.stderr.read()
|
||||||
if Error_message == b'tcpdump: unknown file format\n':
|
if Error_message == b'tcpdump: unknown file format\n':
|
||||||
data_incorrect_format(session_uuid)
|
data_incorrect_format(stream_name, session_uuid, uuid)
|
||||||
|
|
||||||
#print(process.stdout.read())
|
#print(process.stdout.read())
|
||||||
nb_save += 1
|
nb_save += 1
|
||||||
|
@ -95,7 +113,7 @@ if __name__ == "__main__":
|
||||||
out, err = process.communicate(timeout= 0.5)
|
out, err = process.communicate(timeout= 0.5)
|
||||||
#print(out)
|
#print(out)
|
||||||
if err == b'tcpdump: unknown file format\n':
|
if err == b'tcpdump: unknown file format\n':
|
||||||
data_incorrect_format(session_uuid)
|
data_incorrect_format(stream_name, session_uuid, uuid)
|
||||||
elif err:
|
elif err:
|
||||||
print(err)
|
print(err)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue