mirror of https://github.com/CIRCL/AIL-framework
completely remove ZMQ_PubSub.py
parent
8d9ffbaa53
commit
99c8cc7941
|
@ -45,10 +45,23 @@ class Redis_Queues(object):
|
||||||
self.sub_socket.connect(sub_address)
|
self.sub_socket.connect(sub_address)
|
||||||
self.sub_socket.setsockopt(zmq.SUBSCRIBE, self.sub_channel)
|
self.sub_socket.setsockopt(zmq.SUBSCRIBE, self.sub_channel)
|
||||||
|
|
||||||
def zmq_pub(self, config_section):
|
def zmq_pub(self, config_section, config_channel):
|
||||||
context = zmq.Context()
|
context = zmq.Context()
|
||||||
self.pub_socket = context.socket(zmq.PUB)
|
self.pub_socket = context.socket(zmq.PUB)
|
||||||
self.pub_socket.bind(self.config.get(config_section, 'adress'))
|
self.pub_socket.bind(self.config.get(config_section, 'adress'))
|
||||||
|
if config_channel is not None:
|
||||||
|
self.pub_channel = self.config.get(config_section, config_channel)
|
||||||
|
else:
|
||||||
|
# The publishing channel is defined dynamically
|
||||||
|
self.pub_channel = None
|
||||||
|
|
||||||
|
def zmq_pub_send(self, msg):
|
||||||
|
if self.pub_channel is None:
|
||||||
|
raise Exception('A channel is reqired to send a message.')
|
||||||
|
self.pub_socket.send('{} {}'.format(self.pub_channel, msg))
|
||||||
|
|
||||||
|
def redis_rpop(self):
|
||||||
|
return self.r_queues.rpop(self.sub_channel + self.subscriber_name)
|
||||||
|
|
||||||
def redis_queue_shutdown(self, is_queue=False):
|
def redis_queue_shutdown(self, is_queue=False):
|
||||||
if is_queue:
|
if is_queue:
|
||||||
|
|
|
@ -39,15 +39,15 @@ if __name__ == "__main__":
|
||||||
|
|
||||||
# Publisher
|
# Publisher
|
||||||
pub_config_section = "PubSub_Global"
|
pub_config_section = "PubSub_Global"
|
||||||
h.zmq_pub(pub_config_section)
|
pub_config_channel = 'channel'
|
||||||
pub_channel = h.config.get(pub_config_section, "channel")
|
h.zmq_pub(pub_config_section, pub_config_channel)
|
||||||
|
|
||||||
# LOGGING #
|
# LOGGING #
|
||||||
publisher.info("Feed Script started to receive & publish.")
|
publisher.info("Feed Script started to receive & publish.")
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
|
|
||||||
message = h.r_queues.rpop(h.sub_channel + h.subscriber_name)
|
message = h.redis_rpop()
|
||||||
# Recovering the streamed message informations.
|
# Recovering the streamed message informations.
|
||||||
if message is not None:
|
if message is not None:
|
||||||
if len(message.split()) == 3:
|
if len(message.split()) == 3:
|
||||||
|
@ -76,4 +76,4 @@ if __name__ == "__main__":
|
||||||
with open(filename, 'wb') as f:
|
with open(filename, 'wb') as f:
|
||||||
f.write(base64.standard_b64decode(gzip64encoded))
|
f.write(base64.standard_b64decode(gzip64encoded))
|
||||||
|
|
||||||
h.pub_socket.send('{} {}'.format(pub_channel, filename))
|
h.zmq_pub_send(filename)
|
||||||
|
|
|
@ -48,16 +48,16 @@ import Helper
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
publisher.channel = "Script"
|
publisher.channel = "Script"
|
||||||
|
|
||||||
# Publisher
|
|
||||||
pub_config_section = 'PubSub_Categ'
|
|
||||||
|
|
||||||
config_section = 'PubSub_Words'
|
config_section = 'PubSub_Words'
|
||||||
config_channel = 'channel_0'
|
config_channel = 'channel_0'
|
||||||
subscriber_name = 'pubcateg'
|
subscriber_name = 'pubcateg'
|
||||||
|
|
||||||
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
|
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
|
||||||
|
|
||||||
h.zmq_pub(pub_config_section)
|
# Publisher
|
||||||
|
pub_config_section = 'PubSub_Categ'
|
||||||
|
|
||||||
|
h.zmq_pub(pub_config_section, None)
|
||||||
|
|
||||||
# SCRIPT PARSER #
|
# SCRIPT PARSER #
|
||||||
parser = argparse.ArgumentParser(
|
parser = argparse.ArgumentParser(
|
||||||
|
@ -86,7 +86,7 @@ if __name__ == "__main__":
|
||||||
prec_filename = None
|
prec_filename = None
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
message = h.r_queues.rpop(h.sub_channel + h.subscriber_name)
|
message = h.redis_rpop()
|
||||||
if message is not None:
|
if message is not None:
|
||||||
channel, filename, word, score = message.split()
|
channel, filename, word, score = message.split()
|
||||||
|
|
||||||
|
@ -97,8 +97,8 @@ if __name__ == "__main__":
|
||||||
for categ, words_list in tmp_dict.items():
|
for categ, words_list in tmp_dict.items():
|
||||||
|
|
||||||
if word.lower() in words_list:
|
if word.lower() in words_list:
|
||||||
h.pub_socket.send('{} {} {} {}'.format(
|
h.pub_channel = categ
|
||||||
categ, PST.p_path, word, score))
|
h.zmq_pub_send('{} {} {}'.format(PST.p_path, word, score))
|
||||||
|
|
||||||
publisher.info(
|
publisher.info(
|
||||||
'Categ;{};{};{};Detected {} "{}"'.format(
|
'Categ;{};{};{};Detected {} "{}"'.format(
|
||||||
|
|
|
@ -5,10 +5,11 @@
|
||||||
The ZMQ_PubSub_Lines Module
|
The ZMQ_PubSub_Lines Module
|
||||||
============================
|
============================
|
||||||
|
|
||||||
This module is consuming the Redis-list created by the ZMQ_PubSub_Line_Q Module.
|
This module is consuming the Redis-list created by the ZMQ_PubSub_Line_Q
|
||||||
|
Module.
|
||||||
|
|
||||||
It perform a sorting on the line's length and publish/forward them to differents
|
It perform a sorting on the line's length and publish/forward them to
|
||||||
channels:
|
differents channels:
|
||||||
|
|
||||||
*Channel 1 if max length(line) < max
|
*Channel 1 if max length(line) < max
|
||||||
*Channel 2 if max length(line) > max
|
*Channel 2 if max length(line) > max
|
||||||
|
@ -28,79 +29,62 @@ Requirements
|
||||||
"""
|
"""
|
||||||
import redis
|
import redis
|
||||||
import argparse
|
import argparse
|
||||||
import ConfigParser
|
|
||||||
import time
|
import time
|
||||||
from packages import Paste
|
from packages import Paste
|
||||||
from packages import ZMQ_PubSub
|
|
||||||
from pubsublogger import publisher
|
from pubsublogger import publisher
|
||||||
|
|
||||||
configfile = './packages/config.cfg'
|
import Helper
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
publisher.channel = "Script"
|
||||||
|
|
||||||
def main():
|
config_section = 'PubSub_Global'
|
||||||
"""Main Function"""
|
config_channel = 'channel'
|
||||||
|
subscriber_name = 'line'
|
||||||
|
|
||||||
# CONFIG #
|
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
|
||||||
cfg = ConfigParser.ConfigParser()
|
|
||||||
cfg.read(configfile)
|
# Publisher
|
||||||
|
pub_config_section = 'PubSub_Longlines'
|
||||||
|
h.zmq_pub(pub_config_section, None)
|
||||||
|
|
||||||
|
# Subscriber
|
||||||
|
h.zmq_sub(config_section)
|
||||||
|
|
||||||
# SCRIPT PARSER #
|
# SCRIPT PARSER #
|
||||||
parser = argparse.ArgumentParser(
|
parser = argparse.ArgumentParser(
|
||||||
description='''This script is a part of the Analysis Information Leak framework.''',
|
description='''This script is a part of the Analysis Information \
|
||||||
epilog='''''')
|
Leak framework.''')
|
||||||
|
|
||||||
parser.add_argument('-max', type=int, default=500,
|
parser.add_argument(
|
||||||
help='The limit between "short lines" and "long lines" (500)',
|
'-max', type=int, default=500,
|
||||||
action='store')
|
help='The limit between "short lines" and "long lines"',
|
||||||
|
action='store')
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
# REDIS #
|
# REDIS #
|
||||||
|
# FIXME move it in the Paste object
|
||||||
r_serv = redis.StrictRedis(
|
r_serv = redis.StrictRedis(
|
||||||
host=cfg.get("Redis_Data_Merging", "host"),
|
host=h.config.get("Redis_Data_Merging", "host"),
|
||||||
port=cfg.getint("Redis_Data_Merging", "port"),
|
port=h.config.getint("Redis_Data_Merging", "port"),
|
||||||
db=cfg.getint("Redis_Data_Merging", "db"))
|
db=h.config.getint("Redis_Data_Merging", "db"))
|
||||||
|
|
||||||
r_serv1 = redis.StrictRedis(
|
channel_0 = h.config.get("PubSub_Longlines", "channel_0")
|
||||||
host=cfg.get("Redis_Queues", "host"),
|
channel_1 = h.config.get("PubSub_Longlines", "channel_1")
|
||||||
port=cfg.getint("Redis_Queues", "port"),
|
|
||||||
db=cfg.getint("Redis_Queues", "db"))
|
|
||||||
|
|
||||||
# LOGGING #
|
|
||||||
publisher.channel = "Script"
|
|
||||||
|
|
||||||
# ZMQ #
|
|
||||||
# Subscriber
|
|
||||||
channel = cfg.get("PubSub_Global", "channel")
|
|
||||||
subscriber_name = "line"
|
|
||||||
subscriber_config_section = "PubSub_Global"
|
|
||||||
|
|
||||||
# Publisher
|
|
||||||
publisher_config_section = "PubSub_Longlines"
|
|
||||||
publisher_name = "publine"
|
|
||||||
|
|
||||||
sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, channel, subscriber_name)
|
|
||||||
|
|
||||||
pub = ZMQ_PubSub.ZMQPub(configfile, publisher_config_section, publisher_name)
|
|
||||||
|
|
||||||
channel_0 = cfg.get("PubSub_Longlines", "channel_0")
|
|
||||||
channel_1 = cfg.get("PubSub_Longlines", "channel_1")
|
|
||||||
|
|
||||||
# FUNCTIONS #
|
# FUNCTIONS #
|
||||||
tmp_string = "Lines script Subscribed to channel {} and Start to publish on channel {}, {}"
|
tmp_string = "Lines script Subscribed to channel {} and Start to publish \
|
||||||
publisher.info(tmp_string.format(
|
on channel {}, {}"
|
||||||
cfg.get("PubSub_Global", "channel"),
|
publisher.info(tmp_string.format(h.sub_channel, channel_0, channel_1))
|
||||||
cfg.get("PubSub_Longlines", "channel_0"),
|
|
||||||
cfg.get("PubSub_Longlines", "channel_1")))
|
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
message = sub.get_msg_from_queue(r_serv1)
|
message = h.redis_rpop()
|
||||||
if message is not None:
|
if message is not None:
|
||||||
PST = Paste.Paste(message.split(" ", -1)[-1])
|
PST = Paste.Paste(message.split(" ", -1)[-1])
|
||||||
else:
|
else:
|
||||||
if r_serv1.sismember("SHUTDOWN_FLAGS", "Lines"):
|
if h.redis_queue_shutdown():
|
||||||
r_serv1.srem("SHUTDOWN_FLAGS", "Lines")
|
|
||||||
print "Shutdown Flag Up: Terminating"
|
print "Shutdown Flag Up: Terminating"
|
||||||
publisher.warning("Shutdown Flag Up: Terminating.")
|
publisher.warning("Shutdown Flag Up: Terminating.")
|
||||||
break
|
break
|
||||||
|
@ -111,18 +95,14 @@ def main():
|
||||||
lines_infos = PST.get_lines_info()
|
lines_infos = PST.get_lines_info()
|
||||||
|
|
||||||
PST.save_attribute_redis(r_serv, "p_nb_lines", lines_infos[0])
|
PST.save_attribute_redis(r_serv, "p_nb_lines", lines_infos[0])
|
||||||
PST.save_attribute_redis(r_serv, "p_max_length_line", lines_infos[1])
|
PST.save_attribute_redis(r_serv, "p_max_length_line",
|
||||||
|
lines_infos[1])
|
||||||
|
|
||||||
r_serv.sadd("Pastes_Objects", PST.p_path)
|
r_serv.sadd("Pastes_Objects", PST.p_path)
|
||||||
if lines_infos[1] >= args.max:
|
if lines_infos[1] >= args.max:
|
||||||
msg = channel_0+" "+PST.p_path
|
h.pub_channel = channel_0
|
||||||
else:
|
else:
|
||||||
msg = channel_1+" "+PST.p_path
|
h.pub_channel = channel_1
|
||||||
|
h.zmq_pub_send(PST.p_path)
|
||||||
pub.send_message(msg)
|
|
||||||
except IOError:
|
except IOError:
|
||||||
print "CRC Checksum Error on : ", PST.p_path
|
print "CRC Checksum Error on : ", PST.p_path
|
||||||
pass
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
|
|
|
@ -32,31 +32,29 @@ import Helper
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
publisher.channel = "Script"
|
publisher.channel = "Script"
|
||||||
|
|
||||||
# Publisher
|
|
||||||
pub_config_section = 'PubSub_Words'
|
|
||||||
|
|
||||||
config_section = 'PubSub_Longlines'
|
config_section = 'PubSub_Longlines'
|
||||||
config_channel = 'channel_1'
|
config_channel = 'channel_1'
|
||||||
subscriber_name = 'tokenize'
|
subscriber_name = 'tokenize'
|
||||||
|
|
||||||
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
|
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
|
||||||
|
|
||||||
h.zmq_pub(pub_config_section)
|
# Publisher
|
||||||
pub_channel = h.config.get(pub_config_section, "channel_0")
|
pub_config_section = 'PubSub_Words'
|
||||||
|
pub_config_channel = 'channel_0'
|
||||||
|
h.zmq_pub(pub_config_section, pub_config_channel)
|
||||||
|
|
||||||
# LOGGING #
|
# LOGGING #
|
||||||
publisher.info("Tokeniser subscribed to channel {}".format(h.sub_channel))
|
publisher.info("Tokeniser subscribed to channel {}".format(h.sub_channel))
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
message = h.r_queues.rpop(h.sub_channel + h.subscriber_name)
|
message = h.redis_rpop()
|
||||||
print message
|
print message
|
||||||
if message is not None:
|
if message is not None:
|
||||||
paste = Paste.Paste(message.split(" ", -1)[-1])
|
paste = Paste.Paste(message.split(" ", -1)[-1])
|
||||||
for word, score in paste._get_top_words().items():
|
for word, score in paste._get_top_words().items():
|
||||||
if len(word) >= 4:
|
if len(word) >= 4:
|
||||||
h.pub_socket.send(
|
h.zmq_pub_send('{} {} {}'.format(paste.p_path, word,
|
||||||
'{} {} {} {}'.format(pub_channel, paste.p_path,
|
score))
|
||||||
word, score))
|
|
||||||
else:
|
else:
|
||||||
if h.redis_queue_shutdown():
|
if h.redis_queue_shutdown():
|
||||||
print "Shutdown Flag Up: Terminating"
|
print "Shutdown Flag Up: Terminating"
|
||||||
|
|
|
@ -37,17 +37,17 @@ if __name__ == "__main__":
|
||||||
|
|
||||||
# Publisher
|
# Publisher
|
||||||
pub_config_section = 'PubSub_Global'
|
pub_config_section = 'PubSub_Global'
|
||||||
h.zmq_pub(pub_config_section)
|
pub_config_channel = 'channel'
|
||||||
pub_channel = h.config.get(pub_config_section, "channel")
|
h.zmq_pub(pub_config_section, pub_config_channel)
|
||||||
|
|
||||||
# LOGGING #
|
# LOGGING #
|
||||||
publisher.info("Starting to publish.")
|
publisher.info("Starting to publish.")
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
filename = h.r_queues.lpop(h.sub_channel)
|
filename = h.redis_rpop()
|
||||||
|
|
||||||
if filename is not None:
|
if filename is not None:
|
||||||
h.pub_socket.send('{} {}'.format(pub_channel, filename))
|
h.zmq_pub_send(filename)
|
||||||
else:
|
else:
|
||||||
time.sleep(10)
|
time.sleep(10)
|
||||||
publisher.debug("Nothing to publish")
|
publisher.debug("Nothing to publish")
|
||||||
|
|
|
@ -27,56 +27,41 @@ Requirements
|
||||||
|
|
||||||
"""
|
"""
|
||||||
import redis
|
import redis
|
||||||
import ConfigParser
|
|
||||||
import time
|
import time
|
||||||
from packages import Paste
|
from packages import Paste
|
||||||
from packages import ZMQ_PubSub
|
|
||||||
from pubsublogger import publisher
|
from pubsublogger import publisher
|
||||||
|
|
||||||
configfile = './packages/config.cfg'
|
import Helper
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
publisher.channel = "Script"
|
||||||
|
|
||||||
def main():
|
config_section = 'PubSub_Global'
|
||||||
"""Main Function"""
|
config_channel = 'channel'
|
||||||
|
subscriber_name = 'attributes'
|
||||||
|
|
||||||
# CONFIG #
|
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
|
||||||
cfg = ConfigParser.ConfigParser()
|
|
||||||
cfg.read(configfile)
|
# Subscriber
|
||||||
|
h.zmq_sub(config_section)
|
||||||
|
|
||||||
# REDIS #
|
# REDIS #
|
||||||
r_serv = redis.StrictRedis(
|
r_serv = redis.StrictRedis(
|
||||||
host=cfg.get("Redis_Data_Merging", "host"),
|
host=h.config.get("Redis_Data_Merging", "host"),
|
||||||
port=cfg.getint("Redis_Data_Merging", "port"),
|
port=h.config.getint("Redis_Data_Merging", "port"),
|
||||||
db=cfg.getint("Redis_Data_Merging", "db"))
|
db=h.config.getint("Redis_Data_Merging", "db"))
|
||||||
|
|
||||||
r_serv1 = redis.StrictRedis(
|
|
||||||
host=cfg.get("Redis_Queues", "host"),
|
|
||||||
port=cfg.getint("Redis_Queues", "port"),
|
|
||||||
db=cfg.getint("Redis_Queues", "db"))
|
|
||||||
|
|
||||||
# LOGGING #
|
|
||||||
publisher.channel = "Script"
|
|
||||||
|
|
||||||
# ZMQ #
|
|
||||||
# Subscriber
|
|
||||||
channel = cfg.get("PubSub_Global", "channel")
|
|
||||||
subscriber_name = "attributes"
|
|
||||||
subscriber_config_section = "PubSub_Global"
|
|
||||||
|
|
||||||
sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, channel, subscriber_name)
|
|
||||||
|
|
||||||
# FUNCTIONS #
|
# FUNCTIONS #
|
||||||
publisher.info("""ZMQ Attribute is Running""")
|
publisher.info("""ZMQ Attribute is Running""")
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
message = sub.get_msg_from_queue(r_serv1)
|
message = h.redis_rpop()
|
||||||
|
|
||||||
if message is not None:
|
if message is not None:
|
||||||
PST = Paste.Paste(message.split(" ", -1)[-1])
|
PST = Paste.Paste(message.split(" ", -1)[-1])
|
||||||
else:
|
else:
|
||||||
if r_serv1.sismember("SHUTDOWN_FLAGS", "Attributes"):
|
if h.redis_queue_shutdown():
|
||||||
r_serv1.srem("SHUTDOWN_FLAGS", "Attributes")
|
|
||||||
print "Shutdown Flag Up: Terminating"
|
print "Shutdown Flag Up: Terminating"
|
||||||
publisher.warning("Shutdown Flag Up: Terminating.")
|
publisher.warning("Shutdown Flag Up: Terminating.")
|
||||||
break
|
break
|
||||||
|
@ -95,9 +80,6 @@ def main():
|
||||||
PST.save_all_attributes_redis(r_serv)
|
PST.save_all_attributes_redis(r_serv)
|
||||||
except IOError:
|
except IOError:
|
||||||
print "CRC Checksum Failed on :", PST.p_path
|
print "CRC Checksum Failed on :", PST.p_path
|
||||||
publisher.error('{0};{1};{2};{3};{4}'.format("Duplicate", PST.p_source, PST.p_date, PST.p_name, "CRC Checksum Failed"))
|
publisher.error('{0};{1};{2};{3};{4}'.format(
|
||||||
pass
|
"Duplicate", PST.p_source, PST.p_date, PST.p_name,
|
||||||
|
"CRC Checksum Failed"))
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
|
|
|
@ -1,53 +1,44 @@
|
||||||
#!/usr/bin/env python2
|
#!/usr/bin/env python2
|
||||||
# -*-coding:UTF-8 -*
|
# -*-coding:UTF-8 -*
|
||||||
import redis
|
import redis
|
||||||
import ConfigParser
|
|
||||||
import pprint
|
import pprint
|
||||||
import time
|
import time
|
||||||
from packages import Paste
|
from packages import Paste
|
||||||
from packages import lib_refine
|
from packages import lib_refine
|
||||||
from packages import ZMQ_PubSub
|
|
||||||
from pubsublogger import publisher
|
from pubsublogger import publisher
|
||||||
|
|
||||||
|
import Helper
|
||||||
|
|
||||||
configfile = './packages/config.cfg'
|
if __name__ == "__main__":
|
||||||
|
|
||||||
|
|
||||||
def main():
|
|
||||||
"""Main Function"""
|
|
||||||
|
|
||||||
# CONFIG #
|
|
||||||
cfg = ConfigParser.ConfigParser()
|
|
||||||
cfg.read(configfile)
|
|
||||||
|
|
||||||
# REDIS #
|
|
||||||
r_serv = redis.StrictRedis(
|
|
||||||
host=cfg.get("Redis_Queues", "host"),
|
|
||||||
port=cfg.getint("Redis_Queues", "port"),
|
|
||||||
db=cfg.getint("Redis_Queues", "db"))
|
|
||||||
|
|
||||||
r_serv1 = redis.StrictRedis(
|
|
||||||
host=cfg.get("Redis_Data_Merging", "host"),
|
|
||||||
port=cfg.getint("Redis_Data_Merging", "port"),
|
|
||||||
db=cfg.getint("Redis_Data_Merging", "db"))
|
|
||||||
|
|
||||||
# LOGGING #
|
|
||||||
publisher.channel = "Script"
|
publisher.channel = "Script"
|
||||||
|
|
||||||
# ZMQ #
|
config_section = 'PubSub_Categ'
|
||||||
sub = ZMQ_PubSub.ZMQSub(configfile, "PubSub_Categ", "creditcard_categ", "cards")
|
config_channel = 'channel_0'
|
||||||
|
subscriber_name = 'cards'
|
||||||
|
|
||||||
|
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
|
||||||
|
|
||||||
|
# Subscriber
|
||||||
|
h.zmq_sub(config_section)
|
||||||
|
|
||||||
|
# REDIS #
|
||||||
|
r_serv1 = redis.StrictRedis(
|
||||||
|
host=h.config.get("Redis_Data_Merging", "host"),
|
||||||
|
port=h.config.getint("Redis_Data_Merging", "port"),
|
||||||
|
db=h.config.getint("Redis_Data_Merging", "db"))
|
||||||
|
|
||||||
# FUNCTIONS #
|
# FUNCTIONS #
|
||||||
publisher.info("Creditcard script subscribed to channel creditcard_categ")
|
publisher.info("Creditcard script subscribed to channel creditcard_categ")
|
||||||
|
|
||||||
message = sub.get_msg_from_queue(r_serv)
|
message = h.redis_rpop()
|
||||||
prec_filename = None
|
prec_filename = None
|
||||||
|
|
||||||
creditcard_regex = "4[0-9]{12}(?:[0-9]{3})?"
|
creditcard_regex = "4[0-9]{12}(?:[0-9]{3})?"
|
||||||
|
|
||||||
# mastercard_regex = "5[1-5]\d{2}([\ \-]?)\d{4}\1\d{4}\1\d{4}"
|
# mastercard_regex = "5[1-5]\d{2}([\ \-]?)\d{4}\1\d{4}\1\d{4}"
|
||||||
# visa_regex = "4\d{3}([\ \-]?)\d{4}\1\d{4}\1\d{4}"
|
# visa_regex = "4\d{3}([\ \-]?)\d{4}\1\d{4}\1\d{4}"
|
||||||
# discover_regex = "6(?:011\d\d|5\d{4}|4[4-9]\d{3}|22(?:1(?:2[6-9]|[3-9]\d)|[2-8]\d\d|9(?:[01]\d|2[0-5])))\d{10}"
|
# discover_regex = "6(?:011\d\d|5\d{4}|4[4-9]\d{3}|22(?:1(?:2[6-9]|
|
||||||
|
# [3-9]\d)|[2-8]\d\d|9(?:[01]\d|2[0-5])))\d{10}"
|
||||||
# jcb_regex = "35(?:2[89]|[3-8]\d)([\ \-]?)\d{4}\1\d{4}\1\d{4}"
|
# jcb_regex = "35(?:2[89]|[3-8]\d)([\ \-]?)\d{4}\1\d{4}\1\d{4}"
|
||||||
# amex_regex = "3[47]\d\d([\ \-]?)\d{6}\1\d{5}"
|
# amex_regex = "3[47]\d\d([\ \-]?)\d{6}\1\d{5}"
|
||||||
# chinaUP_regex = "62[0-5]\d{13,16}"
|
# chinaUP_regex = "62[0-5]\d{13,16}"
|
||||||
|
@ -69,25 +60,22 @@ def main():
|
||||||
PST.save_attribute_redis(r_serv1, channel, creditcard_set)
|
PST.save_attribute_redis(r_serv1, channel, creditcard_set)
|
||||||
|
|
||||||
pprint.pprint(creditcard_set)
|
pprint.pprint(creditcard_set)
|
||||||
to_print = 'CreditCard;{};{};{};'.format(PST.p_source, PST.p_date, PST.p_name)
|
to_print = 'CreditCard;{};{};{};'.format(
|
||||||
|
PST.p_source, PST.p_date, PST.p_name)
|
||||||
if (len(creditcard_set) > 0):
|
if (len(creditcard_set) > 0):
|
||||||
publisher.critical('{}Checked {} valid number(s)'.format(to_print, len(creditcard_set)))
|
publisher.critical('{}Checked {} valid number(s)'.format(
|
||||||
|
to_print, len(creditcard_set)))
|
||||||
else:
|
else:
|
||||||
publisher.info('{}CreditCard related'.format(to_print))
|
publisher.info('{}CreditCard related'.format(to_print))
|
||||||
|
|
||||||
prec_filename = filename
|
prec_filename = filename
|
||||||
|
|
||||||
else:
|
else:
|
||||||
if r_serv.sismember("SHUTDOWN_FLAGS", "Creditcards"):
|
if h.redis_queue_shutdown():
|
||||||
r_serv.srem("SHUTDOWN_FLAGS", "Creditcards")
|
|
||||||
print "Shutdown Flag Up: Terminating"
|
print "Shutdown Flag Up: Terminating"
|
||||||
publisher.warning("Shutdown Flag Up: Terminating.")
|
publisher.warning("Shutdown Flag Up: Terminating.")
|
||||||
break
|
break
|
||||||
publisher.debug("Script creditcard is idling 1m")
|
publisher.debug("Script creditcard is idling 1m")
|
||||||
time.sleep(60)
|
time.sleep(60)
|
||||||
|
|
||||||
message = sub.get_msg_from_queue(r_serv)
|
message = h.redis_rpop()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
|
|
|
@ -6,7 +6,8 @@ The ZMQ_Sub_Curve Module
|
||||||
|
|
||||||
This module is consuming the Redis-list created by the ZMQ_Sub_Curve_Q Module.
|
This module is consuming the Redis-list created by the ZMQ_Sub_Curve_Q Module.
|
||||||
|
|
||||||
This modules update a .csv file used to draw curves representing selected words and their occurency per day.
|
This modules update a .csv file used to draw curves representing selected
|
||||||
|
words and their occurency per day.
|
||||||
|
|
||||||
..note:: The channel will have the name of the file created.
|
..note:: The channel will have the name of the file created.
|
||||||
|
|
||||||
|
@ -22,72 +23,60 @@ Requirements
|
||||||
|
|
||||||
"""
|
"""
|
||||||
import redis
|
import redis
|
||||||
import ConfigParser
|
|
||||||
import time
|
import time
|
||||||
from packages import Paste as P
|
from packages import Paste
|
||||||
from packages import ZMQ_PubSub
|
|
||||||
from pubsublogger import publisher
|
from pubsublogger import publisher
|
||||||
from packages import lib_words
|
from packages import lib_words
|
||||||
|
|
||||||
configfile = './packages/config.cfg'
|
import Helper
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
def main():
|
|
||||||
"""Main Function"""
|
|
||||||
|
|
||||||
# CONFIG #
|
|
||||||
cfg = ConfigParser.ConfigParser()
|
|
||||||
cfg.read(configfile)
|
|
||||||
|
|
||||||
# REDIS #
|
|
||||||
r_serv = redis.StrictRedis(
|
|
||||||
host=cfg.get("Redis_Queues", "host"),
|
|
||||||
port=cfg.getint("Redis_Queues", "port"),
|
|
||||||
db=cfg.getint("Redis_Queues", "db"))
|
|
||||||
|
|
||||||
r_serv1 = redis.StrictRedis(
|
|
||||||
host=cfg.get("Redis_Level_DB", "host"),
|
|
||||||
port=cfg.get("Redis_Level_DB", "port"),
|
|
||||||
db=0)
|
|
||||||
|
|
||||||
# LOGGING #
|
|
||||||
publisher.channel = "Script"
|
publisher.channel = "Script"
|
||||||
|
|
||||||
# ZMQ #
|
config_section = 'PubSub_Words'
|
||||||
channel = cfg.get("PubSub_Words", "channel_0")
|
config_channel = 'channel_0'
|
||||||
subscriber_name = "curve"
|
subscriber_name = "curve"
|
||||||
subscriber_config_section = "PubSub_Words"
|
|
||||||
|
|
||||||
sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, channel, subscriber_name)
|
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
|
||||||
|
|
||||||
|
# Subscriber
|
||||||
|
h.zmq_sub(config_section)
|
||||||
|
|
||||||
|
# REDIS #
|
||||||
|
r_serv1 = redis.StrictRedis(
|
||||||
|
host=h.config.get("Redis_Level_DB", "host"),
|
||||||
|
port=h.config.get("Redis_Level_DB", "port"),
|
||||||
|
db=h.config.get("Redis_Level_DB", "db"))
|
||||||
|
|
||||||
# FUNCTIONS #
|
# FUNCTIONS #
|
||||||
publisher.info("Script Curve subscribed to channel {0}".format(cfg.get("PubSub_Words", "channel_0")))
|
publisher.info("Script Curve subscribed to {}".format(h.sub_channel))
|
||||||
|
|
||||||
# FILE CURVE SECTION #
|
# FILE CURVE SECTION #
|
||||||
csv_path = cfg.get("Directories", "wordtrending_csv")
|
csv_path = h.config.get("Directories", "wordtrending_csv")
|
||||||
wordfile_path = cfg.get("Directories", "wordsfile")
|
wordfile_path = h.config.get("Directories", "wordsfile")
|
||||||
|
|
||||||
message = sub.get_msg_from_queue(r_serv)
|
message = h.redis_rpop()
|
||||||
prec_filename = None
|
prec_filename = None
|
||||||
while True:
|
while True:
|
||||||
if message is not None:
|
if message is not None:
|
||||||
channel, filename, word, score = message.split()
|
channel, filename, word, score = message.split()
|
||||||
if prec_filename is None or filename != prec_filename:
|
if prec_filename is None or filename != prec_filename:
|
||||||
PST = P.Paste(filename)
|
PST = Paste.Paste(filename)
|
||||||
lib_words.create_curve_with_word_file(r_serv1, csv_path, wordfile_path, int(PST.p_date.year), int(PST.p_date.month))
|
lib_words.create_curve_with_word_file(
|
||||||
|
r_serv1, csv_path, wordfile_path, int(PST.p_date.year),
|
||||||
|
int(PST.p_date.month))
|
||||||
|
|
||||||
prec_filename = filename
|
prec_filename = filename
|
||||||
prev_score = r_serv1.hget(word.lower(), PST.p_date)
|
prev_score = r_serv1.hget(word.lower(), PST.p_date)
|
||||||
print prev_score
|
print prev_score
|
||||||
if prev_score is not None:
|
if prev_score is not None:
|
||||||
r_serv1.hset(word.lower(), PST.p_date, int(prev_score) + int(score))
|
r_serv1.hset(word.lower(), PST.p_date,
|
||||||
|
int(prev_score) + int(score))
|
||||||
else:
|
else:
|
||||||
r_serv1.hset(word.lower(), PST.p_date, score)
|
r_serv1.hset(word.lower(), PST.p_date, score)
|
||||||
# r_serv.expire(word,86400) #1day
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
if r_serv.sismember("SHUTDOWN_FLAGS", "Curve"):
|
if h.redis_queue_shutdown():
|
||||||
r_serv.srem("SHUTDOWN_FLAGS", "Curve")
|
|
||||||
print "Shutdown Flag Up: Terminating"
|
print "Shutdown Flag Up: Terminating"
|
||||||
publisher.warning("Shutdown Flag Up: Terminating.")
|
publisher.warning("Shutdown Flag Up: Terminating.")
|
||||||
break
|
break
|
||||||
|
@ -95,8 +84,4 @@ def main():
|
||||||
print "sleepin"
|
print "sleepin"
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
message = sub.get_msg_from_queue(r_serv)
|
message = h.redis_rpop()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
|
|
|
@ -13,61 +13,51 @@ Requirements:
|
||||||
|
|
||||||
"""
|
"""
|
||||||
import redis
|
import redis
|
||||||
import ConfigParser
|
|
||||||
import os
|
import os
|
||||||
import time
|
import time
|
||||||
from packages import Paste
|
from packages import Paste
|
||||||
from packages import ZMQ_PubSub
|
|
||||||
from pubsublogger import publisher
|
from pubsublogger import publisher
|
||||||
from pybloomfilter import BloomFilter
|
from pybloomfilter import BloomFilter
|
||||||
|
|
||||||
configfile = './packages/config.cfg'
|
import Helper
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
publisher.channel = "Script"
|
||||||
|
|
||||||
def main():
|
config_section = 'PubSub_Global'
|
||||||
"""Main Function"""
|
config_channel = 'channel'
|
||||||
|
subscriber_name = 'duplicate'
|
||||||
|
|
||||||
# CONFIG #
|
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
|
||||||
cfg = ConfigParser.ConfigParser()
|
|
||||||
cfg.read(configfile)
|
# Subscriber
|
||||||
|
h.zmq_sub(config_section)
|
||||||
|
|
||||||
# REDIS #
|
# REDIS #
|
||||||
# DB QUEUE ( MEMORY )
|
|
||||||
r_Q_serv = redis.StrictRedis(
|
|
||||||
host=cfg.get("Redis_Queues", "host"),
|
|
||||||
port=cfg.getint("Redis_Queues", "port"),
|
|
||||||
db=cfg.getint("Redis_Queues", "db"))
|
|
||||||
|
|
||||||
r_serv_merge = redis.StrictRedis(
|
r_serv_merge = redis.StrictRedis(
|
||||||
host=cfg.get("Redis_Data_Merging", "host"),
|
host=h.config.get("Redis_Data_Merging", "host"),
|
||||||
port=cfg.getint("Redis_Data_Merging", "port"),
|
port=h.config.getint("Redis_Data_Merging", "port"),
|
||||||
db=cfg.getint("Redis_Data_Merging", "db"))
|
db=h.config.getint("Redis_Data_Merging", "db"))
|
||||||
|
|
||||||
# REDIS #
|
# REDIS #
|
||||||
# DB OBJECT & HASHS ( DISK )
|
# DB OBJECT & HASHS ( DISK )
|
||||||
|
# FIXME increase flexibility
|
||||||
dico_redis = {}
|
dico_redis = {}
|
||||||
for year in xrange(2013, 2015):
|
for year in xrange(2013, 2015):
|
||||||
for month in xrange(0, 16):
|
for month in xrange(0, 16):
|
||||||
dico_redis[str(year)+str(month).zfill(2)] = redis.StrictRedis(
|
dico_redis[str(year)+str(month).zfill(2)] = redis.StrictRedis(
|
||||||
host=cfg.get("Redis_Level_DB", "host"),
|
host=h.config.get("Redis_Level_DB", "host"), port=year,
|
||||||
port=year,
|
|
||||||
db=month)
|
db=month)
|
||||||
|
|
||||||
# LOGGING #
|
|
||||||
publisher.channel = "Script"
|
|
||||||
|
|
||||||
# ZMQ #
|
|
||||||
channel = cfg.get("PubSub_Global", "channel")
|
|
||||||
subscriber_name = "duplicate"
|
|
||||||
subscriber_config_section = "PubSub_Global"
|
|
||||||
|
|
||||||
sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, channel, subscriber_name)
|
|
||||||
|
|
||||||
# FUNCTIONS #
|
# FUNCTIONS #
|
||||||
publisher.info("""Script duplicate subscribed to channel {0}""".format(cfg.get("PubSub_Global", "channel")))
|
publisher.info("""Script duplicate subscribed to channel {0}""".format(
|
||||||
|
h.config.get("PubSub_Global", "channel")))
|
||||||
|
|
||||||
set_limit = 100
|
set_limit = 100
|
||||||
|
bloompath = os.path.join(os.environ('AIL_BIN'),
|
||||||
|
h.config.get("Directories", "bloomfilters"))
|
||||||
|
|
||||||
|
bloop_path_set = set()
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
super_dico = {}
|
super_dico = {}
|
||||||
|
@ -77,15 +67,14 @@ def main():
|
||||||
|
|
||||||
x = time.time()
|
x = time.time()
|
||||||
|
|
||||||
message = sub.get_msg_from_queue(r_Q_serv)
|
message = h.redis_rpop()
|
||||||
if message is not None:
|
if message is not None:
|
||||||
path = message.split(" ", -1)[-1]
|
path = message.split(" ", -1)[-1]
|
||||||
PST = Paste.Paste(path)
|
PST = Paste.Paste(path)
|
||||||
else:
|
else:
|
||||||
publisher.debug("Script Attribute is idling 10s")
|
publisher.debug("Script Attribute is idling 10s")
|
||||||
time.sleep(10)
|
time.sleep(10)
|
||||||
if r_Q_serv.sismember("SHUTDOWN_FLAGS", "Duplicate"):
|
if h.redis_queue_shutdown():
|
||||||
r_Q_serv.srem("SHUTDOWN_FLAGS", "Duplicate")
|
|
||||||
print "Shutdown Flag Up: Terminating"
|
print "Shutdown Flag Up: Terminating"
|
||||||
publisher.warning("Shutdown Flag Up: Terminating.")
|
publisher.warning("Shutdown Flag Up: Terminating.")
|
||||||
break
|
break
|
||||||
|
@ -97,19 +86,14 @@ def main():
|
||||||
r_serv1 = dico_redis[PST.p_date.year + PST.p_date.month]
|
r_serv1 = dico_redis[PST.p_date.year + PST.p_date.month]
|
||||||
|
|
||||||
# Creating the bloom filter name: bloomyyyymm
|
# Creating the bloom filter name: bloomyyyymm
|
||||||
bloomname = 'bloom' + PST.p_date.year + PST.p_date.month
|
filebloompath = os.path.join(bloompath, 'bloom' + PST.p_date.year +
|
||||||
|
PST.p_date.month)
|
||||||
bloompath = cfg.get("Directories", "bloomfilters")
|
|
||||||
|
|
||||||
filebloompath = bloompath + bloomname
|
|
||||||
|
|
||||||
# datetime.date(int(PST.p_date.year),int(PST.p_date.month),int(PST.p_date.day)).timetuple().tm_yday % 7
|
|
||||||
|
|
||||||
if os.path.exists(filebloompath):
|
if os.path.exists(filebloompath):
|
||||||
bloom = BloomFilter.open(filebloompath)
|
bloom = BloomFilter.open(filebloompath)
|
||||||
else:
|
else:
|
||||||
bloom = BloomFilter(100000000, 0.01, filebloompath)
|
bloom = BloomFilter(100000000, 0.01, filebloompath)
|
||||||
r_Q_serv.sadd("bloomlist", filebloompath)
|
bloop_path_set.add(filebloompath)
|
||||||
|
|
||||||
# UNIQUE INDEX HASHS TABLE
|
# UNIQUE INDEX HASHS TABLE
|
||||||
r_serv0 = dico_redis["201300"]
|
r_serv0 = dico_redis["201300"]
|
||||||
|
@ -121,45 +105,43 @@ def main():
|
||||||
|
|
||||||
# For each bloom filter
|
# For each bloom filter
|
||||||
opened_bloom = []
|
opened_bloom = []
|
||||||
for bloo in r_Q_serv.smembers("bloomlist"):
|
for bloo in bloop_path_set:
|
||||||
# Opening blooms
|
# Opening blooms
|
||||||
opened_bloom.append(BloomFilter.open(bloo))
|
opened_bloom.append(BloomFilter.open(bloo))
|
||||||
|
|
||||||
# For each hash of the paste
|
# For each hash of the paste
|
||||||
for hash in PST._get_hash_lines(min=5, start=1, jump=0):
|
for line_hash in PST._get_hash_lines(min=5, start=1, jump=0):
|
||||||
nb_hash_current += 1
|
nb_hash_current += 1
|
||||||
|
|
||||||
# Adding the hash in Redis & limiting the set
|
# Adding the hash in Redis & limiting the set
|
||||||
if r_serv1.scard(hash) <= set_limit:
|
if r_serv1.scard(line_hash) <= set_limit:
|
||||||
r_serv1.sadd(hash, index)
|
r_serv1.sadd(line_hash, index)
|
||||||
r_serv1.sadd("HASHS", hash)
|
r_serv1.sadd("HASHS", line_hash)
|
||||||
# Adding the hash in the bloom of the month
|
# Adding the hash in the bloom of the month
|
||||||
bloom.add(hash)
|
bloom.add(line_hash)
|
||||||
|
|
||||||
# Go throught the Database of the bloom filter (of the month)
|
# Go throught the Database of the bloom filter (of the month)
|
||||||
for bloo in opened_bloom:
|
for bloo in opened_bloom:
|
||||||
if hash in bloo:
|
if line_hash in bloo:
|
||||||
db = bloo.name[-6:]
|
db = bloo.name[-6:]
|
||||||
# Go throught the Database of the bloom filter (of the month)
|
# Go throught the Database of the bloom filter (month)
|
||||||
r_serv_bloom = dico_redis[db]
|
r_serv_bloom = dico_redis[db]
|
||||||
|
|
||||||
# set of index paste: set([1,2,4,65])
|
# set of index paste: set([1,2,4,65])
|
||||||
hash_current = r_serv_bloom.smembers(hash)
|
hash_current = r_serv_bloom.smembers(line_hash)
|
||||||
# removing itself from the list
|
# removing itself from the list
|
||||||
hash_current = hash_current - set([index])
|
hash_current = hash_current - set([index])
|
||||||
|
|
||||||
# if the hash is present at least in 1 files (already processed)
|
# if the hash is present at least in 1 files
|
||||||
|
# (already processed)
|
||||||
if len(hash_current) != 0:
|
if len(hash_current) != 0:
|
||||||
hash_dico[hash] = hash_current
|
hash_dico[line_hash] = hash_current
|
||||||
|
|
||||||
# if there is data in this dictionnary
|
# if there is data in this dictionnary
|
||||||
if len(hash_dico) != 0:
|
if len(hash_dico) != 0:
|
||||||
super_dico[index] = hash_dico
|
super_dico[index] = hash_dico
|
||||||
else:
|
|
||||||
# The hash is not in this bloom
|
|
||||||
pass
|
|
||||||
|
|
||||||
###########################################################################################
|
###########################################################################
|
||||||
|
|
||||||
# if there is data in this dictionnary
|
# if there is data in this dictionnary
|
||||||
if len(super_dico) != 0:
|
if len(super_dico) != 0:
|
||||||
|
@ -171,7 +153,8 @@ def main():
|
||||||
|
|
||||||
for p_fname in pset:
|
for p_fname in pset:
|
||||||
occur_dico.setdefault(p_fname, 0)
|
occur_dico.setdefault(p_fname, 0)
|
||||||
# Count how much hash is similar per file occuring in the dictionnary
|
# Count how much hash is similar per file occuring
|
||||||
|
# in the dictionnary
|
||||||
if occur_dico[p_fname] >= 0:
|
if occur_dico[p_fname] >= 0:
|
||||||
occur_dico[p_fname] = occur_dico[p_fname] + 1
|
occur_dico[p_fname] = occur_dico[p_fname] + 1
|
||||||
|
|
||||||
|
@ -181,7 +164,8 @@ def main():
|
||||||
dupl.append((paste, percentage))
|
dupl.append((paste, percentage))
|
||||||
|
|
||||||
# Creating the object attribute and save it.
|
# Creating the object attribute and save it.
|
||||||
to_print = 'Duplicate;{};{};{};'.format(PST.p_source, PST.p_date, PST.p_name)
|
to_print = 'Duplicate;{};{};{};'.format(
|
||||||
|
PST.p_source, PST.p_date, PST.p_name)
|
||||||
if dupl != []:
|
if dupl != []:
|
||||||
PST.__setattr__("p_duplicate", dupl)
|
PST.__setattr__("p_duplicate", dupl)
|
||||||
PST.save_attribute_redis(r_serv_merge, "p_duplicate", dupl)
|
PST.save_attribute_redis(r_serv_merge, "p_duplicate", dupl)
|
||||||
|
@ -193,7 +177,3 @@ def main():
|
||||||
except IOError:
|
except IOError:
|
||||||
print "CRC Checksum Failed on :", PST.p_path
|
print "CRC Checksum Failed on :", PST.p_path
|
||||||
publisher.error('{}CRC Checksum Failed'.format(to_print))
|
publisher.error('{}CRC Checksum Failed'.format(to_print))
|
||||||
pass
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
|
|
|
@ -52,7 +52,7 @@ if __name__ == "__main__":
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
message = h.r_queues.rpop(h.sub_channel + h.subscriber_name)
|
message = h.redis_rpop()
|
||||||
|
|
||||||
if message is not None:
|
if message is not None:
|
||||||
PST = Paste.Paste(message.split(" ", -1)[-1])
|
PST = Paste.Paste(message.split(" ", -1)[-1])
|
||||||
|
|
|
@ -2,53 +2,47 @@
|
||||||
# -*-coding:UTF-8 -*
|
# -*-coding:UTF-8 -*
|
||||||
|
|
||||||
import redis
|
import redis
|
||||||
import ConfigParser
|
|
||||||
import pprint
|
import pprint
|
||||||
import time
|
import time
|
||||||
import dns.exception
|
import dns.exception
|
||||||
from packages import Paste as P
|
from packages import Paste
|
||||||
from packages import lib_refine
|
from packages import lib_refine
|
||||||
from packages import ZMQ_PubSub
|
|
||||||
from pubsublogger import publisher
|
from pubsublogger import publisher
|
||||||
|
|
||||||
configfile = './packages/config.cfg'
|
import Helper
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
def main():
|
|
||||||
"""Main Function"""
|
|
||||||
|
|
||||||
# CONFIG #
|
|
||||||
cfg = ConfigParser.ConfigParser()
|
|
||||||
cfg.read(configfile)
|
|
||||||
|
|
||||||
# REDIS #
|
|
||||||
r_serv = redis.StrictRedis(
|
|
||||||
host=cfg.get("Redis_Queues", "host"),
|
|
||||||
port=cfg.getint("Redis_Queues", "port"),
|
|
||||||
db=cfg.getint("Redis_Queues", "db"))
|
|
||||||
|
|
||||||
r_serv1 = redis.StrictRedis(
|
|
||||||
host=cfg.get("Redis_Data_Merging", "host"),
|
|
||||||
port=cfg.getint("Redis_Data_Merging", "port"),
|
|
||||||
db=cfg.getint("Redis_Data_Merging", "db"))
|
|
||||||
|
|
||||||
r_serv2 = redis.StrictRedis(
|
|
||||||
host=cfg.get("Redis_Cache", "host"),
|
|
||||||
port=cfg.getint("Redis_Cache", "port"),
|
|
||||||
db=cfg.getint("Redis_Cache", "db"))
|
|
||||||
|
|
||||||
# LOGGING #
|
|
||||||
publisher.channel = "Script"
|
publisher.channel = "Script"
|
||||||
|
|
||||||
# ZMQ #
|
config_section = 'PubSub_Categ'
|
||||||
sub = ZMQ_PubSub.ZMQSub(configfile, "PubSub_Categ", "mails_categ", "emails")
|
config_channel = 'channel_1'
|
||||||
|
subscriber_name = 'emails'
|
||||||
|
|
||||||
|
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
|
||||||
|
|
||||||
|
# Subscriber
|
||||||
|
h.zmq_sub(config_section)
|
||||||
|
|
||||||
|
# REDIS #
|
||||||
|
r_serv1 = redis.StrictRedis(
|
||||||
|
host=h.config.get("Redis_Data_Merging", "host"),
|
||||||
|
port=h.config.getint("Redis_Data_Merging", "port"),
|
||||||
|
db=h.config.getint("Redis_Data_Merging", "db"))
|
||||||
|
|
||||||
|
r_serv2 = redis.StrictRedis(
|
||||||
|
host=h.config.get("Redis_Cache", "host"),
|
||||||
|
port=h.config.getint("Redis_Cache", "port"),
|
||||||
|
db=h.config.getint("Redis_Cache", "db"))
|
||||||
|
|
||||||
# FUNCTIONS #
|
# FUNCTIONS #
|
||||||
publisher.info("Suscribed to channel mails_categ")
|
publisher.info("Suscribed to channel mails_categ")
|
||||||
|
|
||||||
message = sub.get_msg_from_queue(r_serv)
|
message = h.redis_rpop()
|
||||||
prec_filename = None
|
prec_filename = None
|
||||||
|
|
||||||
|
# Log as critical if there are more that that amout of valid emails
|
||||||
|
is_critical = 10
|
||||||
|
|
||||||
email_regex = "[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,6}"
|
email_regex = "[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,6}"
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
|
@ -57,36 +51,36 @@ def main():
|
||||||
channel, filename, word, score = message.split()
|
channel, filename, word, score = message.split()
|
||||||
|
|
||||||
if prec_filename is None or filename != prec_filename:
|
if prec_filename is None or filename != prec_filename:
|
||||||
PST = P.Paste(filename)
|
PST = Paste.Paste(filename)
|
||||||
MX_values = lib_refine.checking_MX_record(r_serv2, PST.get_regex(email_regex))
|
MX_values = lib_refine.checking_MX_record(
|
||||||
|
r_serv2, PST.get_regex(email_regex))
|
||||||
|
|
||||||
if MX_values[0] >= 1:
|
if MX_values[0] >= 1:
|
||||||
|
|
||||||
PST.__setattr__(channel, MX_values)
|
PST.__setattr__(channel, MX_values)
|
||||||
PST.save_attribute_redis(r_serv1, channel, (MX_values[0], list(MX_values[1])))
|
PST.save_attribute_redis(r_serv1, channel,
|
||||||
|
(MX_values[0],
|
||||||
|
list(MX_values[1])))
|
||||||
|
|
||||||
pprint.pprint(MX_values)
|
pprint.pprint(MX_values)
|
||||||
to_print = 'Mails;{};{};{};Checked {} e-mail(s)'.format(PST.p_source, PST.p_date, PST.p_name, MX_values[0])
|
to_print = 'Mails;{};{};{};Checked {} e-mail(s)'.\
|
||||||
if MX_values[0] > 10:
|
format(PST.p_source, PST.p_date, PST.p_name,
|
||||||
|
MX_values[0])
|
||||||
|
if MX_values[0] > is_critical:
|
||||||
publisher.warning(to_print)
|
publisher.warning(to_print)
|
||||||
else:
|
else:
|
||||||
publisher.info(to_print)
|
publisher.info(to_print)
|
||||||
prec_filename = filename
|
prec_filename = filename
|
||||||
|
|
||||||
else:
|
else:
|
||||||
if r_serv.sismember("SHUTDOWN_FLAGS", "Mails"):
|
if h.redis_queue_shutdown():
|
||||||
r_serv.srem("SHUTDOWN_FLAGS", "Mails")
|
|
||||||
print "Shutdown Flag Up: Terminating"
|
print "Shutdown Flag Up: Terminating"
|
||||||
publisher.warning("Shutdown Flag Up: Terminating.")
|
publisher.warning("Shutdown Flag Up: Terminating.")
|
||||||
break
|
break
|
||||||
publisher.debug("Script Mails is Idling 10s")
|
publisher.debug("Script Mails is Idling 10s")
|
||||||
time.sleep(10)
|
time.sleep(10)
|
||||||
|
|
||||||
message = sub.get_msg_from_queue(r_serv)
|
message = h.redis_rpop()
|
||||||
except dns.exception.Timeout:
|
except dns.exception.Timeout:
|
||||||
|
# FIXME retry!
|
||||||
print "dns.exception.Timeout"
|
print "dns.exception.Timeout"
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
|
|
|
@ -6,8 +6,8 @@ The ZMQ_Sub_Onion Module
|
||||||
|
|
||||||
This module is consuming the Redis-list created by the ZMQ_Sub_Onion_Q Module.
|
This module is consuming the Redis-list created by the ZMQ_Sub_Onion_Q Module.
|
||||||
|
|
||||||
It trying to extract url from paste and returning only ones which are tor related
|
It trying to extract url from paste and returning only ones which are tor
|
||||||
(.onion)
|
related (.onion)
|
||||||
|
|
||||||
..seealso:: Paste method (get_regex)
|
..seealso:: Paste method (get_regex)
|
||||||
|
|
||||||
|
@ -22,45 +22,37 @@ Requirements
|
||||||
|
|
||||||
"""
|
"""
|
||||||
import redis
|
import redis
|
||||||
import ConfigParser
|
|
||||||
import pprint
|
import pprint
|
||||||
import time
|
import time
|
||||||
from packages import Paste
|
from packages import Paste
|
||||||
from packages import ZMQ_PubSub
|
|
||||||
from pubsublogger import publisher
|
from pubsublogger import publisher
|
||||||
|
|
||||||
configfile = './packages/config.cfg'
|
|
||||||
|
|
||||||
|
import Helper
|
||||||
|
|
||||||
def main():
|
if __name__ == "__main__":
|
||||||
"""Main Function"""
|
|
||||||
|
|
||||||
# CONFIG #
|
|
||||||
cfg = ConfigParser.ConfigParser()
|
|
||||||
cfg.read(configfile)
|
|
||||||
|
|
||||||
# REDIS #
|
|
||||||
r_serv = redis.StrictRedis(
|
|
||||||
host=cfg.get("Redis_Queues", "host"),
|
|
||||||
port=cfg.getint("Redis_Queues", "port"),
|
|
||||||
db=cfg.getint("Redis_Queues", "db"))
|
|
||||||
|
|
||||||
r_serv1 = redis.StrictRedis(
|
|
||||||
host=cfg.get("Redis_Data_Merging", "host"),
|
|
||||||
port=cfg.getint("Redis_Data_Merging", "port"),
|
|
||||||
db=cfg.getint("Redis_Data_Merging", "db"))
|
|
||||||
|
|
||||||
# LOGGING #
|
|
||||||
publisher.channel = "Script"
|
publisher.channel = "Script"
|
||||||
|
|
||||||
# ZMQ #
|
config_section = 'PubSub_Categ'
|
||||||
Sub = ZMQ_PubSub.ZMQSub(configfile, "PubSub_Categ", "onion_categ", "tor")
|
config_channel = 'channel_2'
|
||||||
|
subscriber_name = 'tor'
|
||||||
|
|
||||||
|
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
|
||||||
|
|
||||||
|
# Subscriber
|
||||||
|
h.zmq_sub(config_section)
|
||||||
|
|
||||||
|
# REDIS #
|
||||||
|
r_serv1 = redis.StrictRedis(
|
||||||
|
host=h.config.get("Redis_Data_Merging", "host"),
|
||||||
|
port=h.config.getint("Redis_Data_Merging", "port"),
|
||||||
|
db=h.config.getint("Redis_Data_Merging", "db"))
|
||||||
|
|
||||||
# FUNCTIONS #
|
# FUNCTIONS #
|
||||||
publisher.info("Script subscribed to channel onion_categ")
|
publisher.info("Script subscribed to channel onion_categ")
|
||||||
|
|
||||||
# Getting the first message from redis.
|
# Getting the first message from redis.
|
||||||
message = Sub.get_msg_from_queue(r_serv)
|
message = h.redis_rpop()
|
||||||
prec_filename = None
|
prec_filename = None
|
||||||
|
|
||||||
# Thanks to Faup project for this regex
|
# Thanks to Faup project for this regex
|
||||||
|
@ -78,7 +70,8 @@ def main():
|
||||||
|
|
||||||
for x in PST.get_regex(url_regex):
|
for x in PST.get_regex(url_regex):
|
||||||
# Extracting url with regex
|
# Extracting url with regex
|
||||||
credential, subdomain, domain, host, tld, port, resource_path, query_string, f1, f2, f3, f4 = x
|
credential, subdomain, domain, host, tld, port, \
|
||||||
|
resource_path, query_string, f1, f2, f3, f4 = x
|
||||||
|
|
||||||
if f1 == "onion":
|
if f1 == "onion":
|
||||||
domains_list.append(domain)
|
domains_list.append(domain)
|
||||||
|
@ -88,25 +81,22 @@ def main():
|
||||||
PST.save_attribute_redis(r_serv1, channel, domains_list)
|
PST.save_attribute_redis(r_serv1, channel, domains_list)
|
||||||
pprint.pprint(domains_list)
|
pprint.pprint(domains_list)
|
||||||
print PST.p_path
|
print PST.p_path
|
||||||
to_print = 'Onion;{};{};{};'.format(PST.p_source, PST.p_date, PST.p_name)
|
to_print = 'Onion;{};{};{};'.format(PST.p_source, PST.p_date,
|
||||||
|
PST.p_name)
|
||||||
if len(domains_list) > 0:
|
if len(domains_list) > 0:
|
||||||
publisher.warning('{}Detected {} .onion(s)'.format(to_print, len(domains_list)))
|
publisher.warning('{}Detected {} .onion(s)'.format(
|
||||||
|
to_print, len(domains_list)))
|
||||||
else:
|
else:
|
||||||
publisher.info('{}Onion related'.format(to_print))
|
publisher.info('{}Onion related'.format(to_print))
|
||||||
|
|
||||||
prec_filename = filename
|
prec_filename = filename
|
||||||
|
|
||||||
else:
|
else:
|
||||||
if r_serv.sismember("SHUTDOWN_FLAGS", "Onion"):
|
if h.redis_queue_shutdown():
|
||||||
r_serv.srem("SHUTDOWN_FLAGS", "Onion")
|
|
||||||
print "Shutdown Flag Up: Terminating"
|
print "Shutdown Flag Up: Terminating"
|
||||||
publisher.warning("Shutdown Flag Up: Terminating.")
|
publisher.warning("Shutdown Flag Up: Terminating.")
|
||||||
break
|
break
|
||||||
publisher.debug("Script url is Idling 10s")
|
publisher.debug("Script url is Idling 10s")
|
||||||
time.sleep(10)
|
time.sleep(10)
|
||||||
|
|
||||||
message = Sub.get_msg_from_queue(r_serv)
|
message = h.redis_rpop()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
|
|
|
@ -1,13 +1,11 @@
|
||||||
#!/usr/bin/env python2
|
#!/usr/bin/env python2
|
||||||
# -*-coding:UTF-8 -*
|
# -*-coding:UTF-8 -*
|
||||||
import redis
|
import redis
|
||||||
import ConfigParser
|
|
||||||
import pprint
|
import pprint
|
||||||
import time
|
import time
|
||||||
import dns.exception
|
import dns.exception
|
||||||
from packages import Paste
|
from packages import Paste
|
||||||
from packages import lib_refine
|
from packages import lib_refine
|
||||||
from packages import ZMQ_PubSub
|
|
||||||
from pubsublogger import publisher
|
from pubsublogger import publisher
|
||||||
|
|
||||||
# Country and ASN lookup
|
# Country and ASN lookup
|
||||||
|
@ -16,55 +14,43 @@ import socket
|
||||||
import pycountry
|
import pycountry
|
||||||
import ipaddress
|
import ipaddress
|
||||||
|
|
||||||
configfile = './packages/config.cfg'
|
import Helper
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
def main():
|
|
||||||
"""Main Function"""
|
|
||||||
|
|
||||||
# CONFIG #
|
|
||||||
cfg = ConfigParser.ConfigParser()
|
|
||||||
cfg.read(configfile)
|
|
||||||
|
|
||||||
# REDIS #
|
|
||||||
r_serv = redis.StrictRedis(
|
|
||||||
host=cfg.get("Redis_Queues", "host"),
|
|
||||||
port=cfg.getint("Redis_Queues", "port"),
|
|
||||||
db=cfg.getint("Redis_Queues", "db"))
|
|
||||||
|
|
||||||
r_serv1 = redis.StrictRedis(
|
|
||||||
host=cfg.get("Redis_Data_Merging", "host"),
|
|
||||||
port=cfg.getint("Redis_Data_Merging", "port"),
|
|
||||||
db=cfg.getint("Redis_Data_Merging", "db"))
|
|
||||||
|
|
||||||
r_serv2 = redis.StrictRedis(
|
|
||||||
host=cfg.get("Redis_Cache", "host"),
|
|
||||||
port=cfg.getint("Redis_Cache", "port"),
|
|
||||||
db=cfg.getint("Redis_Cache", "db"))
|
|
||||||
|
|
||||||
# LOGGING #
|
|
||||||
publisher.channel = "Script"
|
publisher.channel = "Script"
|
||||||
|
|
||||||
# ZMQ #
|
config_section = 'PubSub_Categ'
|
||||||
# Subscriber
|
config_channel = 'channel_3'
|
||||||
subscriber_name = "urls"
|
subscriber_name = "urls"
|
||||||
subscriber_config_section = "PubSub_Categ"
|
|
||||||
|
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
|
||||||
|
|
||||||
# Publisher
|
# Publisher
|
||||||
publisher_config_section = "PubSub_Url"
|
pub_config_section = "PubSub_Url"
|
||||||
publisher_name = "adress"
|
pub_config_channel = 'channel'
|
||||||
pubchannel = cfg.get("PubSub_Url", "channel")
|
h.zmq_pub(pub_config_section, pub_config_channel)
|
||||||
|
|
||||||
|
# Subscriber
|
||||||
|
h.zmq_sub(config_section)
|
||||||
|
|
||||||
|
# REDIS #
|
||||||
|
r_serv1 = redis.StrictRedis(
|
||||||
|
host=h.config.get("Redis_Data_Merging", "host"),
|
||||||
|
port=h.config.getint("Redis_Data_Merging", "port"),
|
||||||
|
db=h.config.getint("Redis_Data_Merging", "db"))
|
||||||
|
|
||||||
|
r_serv2 = redis.StrictRedis(
|
||||||
|
host=h.config.get("Redis_Cache", "host"),
|
||||||
|
port=h.config.getint("Redis_Cache", "port"),
|
||||||
|
db=h.config.getint("Redis_Cache", "db"))
|
||||||
|
|
||||||
# Country to log as critical
|
# Country to log as critical
|
||||||
cc_critical = cfg.get("PubSub_Url", "cc_critical")
|
cc_critical = h.config.get("PubSub_Url", "cc_critical")
|
||||||
|
|
||||||
sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, "web_categ", subscriber_name)
|
|
||||||
pub = ZMQ_PubSub.ZMQPub(configfile, publisher_config_section, publisher_name)
|
|
||||||
|
|
||||||
# FUNCTIONS #
|
# FUNCTIONS #
|
||||||
publisher.info("Script URL subscribed to channel web_categ")
|
publisher.info("Script URL subscribed to channel web_categ")
|
||||||
|
|
||||||
message = sub.get_msg_from_queue(r_serv)
|
message = h.redis_rpop()
|
||||||
prec_filename = None
|
prec_filename = None
|
||||||
|
|
||||||
url_regex = "(http|https|ftp)\://([a-zA-Z0-9\.\-]+(\:[a-zA-Z0-9\.&%\$\-]+)*@)*((25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9])\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[0-9])|localhost|([a-zA-Z0-9\-]+\.)*[a-zA-Z0-9\-]+\.(com|edu|gov|int|mil|net|org|biz|arpa|info|name|pro|aero|coop|museum|[a-zA-Z]{2}))(\:[0-9]+)*(/($|[a-zA-Z0-9\.\,\?\'\\\+&%\$#\=~_\-]+))*"
|
url_regex = "(http|https|ftp)\://([a-zA-Z0-9\.\-]+(\:[a-zA-Z0-9\.&%\$\-]+)*@)*((25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9])\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[0-9])|localhost|([a-zA-Z0-9\-]+\.)*[a-zA-Z0-9\-]+\.(com|edu|gov|int|mil|net|org|biz|arpa|info|name|pro|aero|coop|museum|[a-zA-Z]{2}))(\:[0-9]+)*(/($|[a-zA-Z0-9\.\,\?\'\\\+&%\$#\=~_\-]+))*"
|
||||||
|
@ -79,11 +65,12 @@ def main():
|
||||||
PST = Paste.Paste(filename)
|
PST = Paste.Paste(filename)
|
||||||
client = ip2asn()
|
client = ip2asn()
|
||||||
for x in PST.get_regex(url_regex):
|
for x in PST.get_regex(url_regex):
|
||||||
scheme, credential, subdomain, domain, host, tld, port, resource_path, query_string, f1, f2, f3, f4 = x
|
scheme, credential, subdomain, domain, host, tld, \
|
||||||
|
port, resource_path, query_string, f1, f2, f3, \
|
||||||
|
f4 = x
|
||||||
domains_list.append(domain)
|
domains_list.append(domain)
|
||||||
msg = pubchannel + " " + str(x)
|
h.zmq_pub_send(str(x))
|
||||||
pub.send_message(msg)
|
publisher.debug('{} Published'.format(x))
|
||||||
publisher.debug('{0} Published'.format(x))
|
|
||||||
|
|
||||||
if f1 == "onion":
|
if f1 == "onion":
|
||||||
print domain
|
print domain
|
||||||
|
@ -107,35 +94,38 @@ def main():
|
||||||
# EU is not an official ISO 3166 code (but used by RIPE
|
# EU is not an official ISO 3166 code (but used by RIPE
|
||||||
# IP allocation)
|
# IP allocation)
|
||||||
if cc is not None and cc != "EU":
|
if cc is not None and cc != "EU":
|
||||||
print hostl, asn, cc, pycountry.countries.get(alpha2=cc).name
|
print hostl, asn, cc, \
|
||||||
|
pycountry.countries.get(alpha2=cc).name
|
||||||
if cc == cc_critical:
|
if cc == cc_critical:
|
||||||
# FIXME: That's going to fail.
|
# FIXME: That's going to fail.
|
||||||
publisher.warning('{0};{1};{2};{3};{4}'.format("Url", PST.p_source, PST.p_date, PST.p_name, "Detected " + str(A_values[0]) + " " + hostl + " " + cc))
|
publisher.warning(
|
||||||
|
'Url;{};{};{};Detected {} {} {}'.format(
|
||||||
|
PST.p_source, PST.p_date, PST.p_name,
|
||||||
|
A_values[0], hostl, cc))
|
||||||
else:
|
else:
|
||||||
print hostl, asn, cc
|
print hostl, asn, cc
|
||||||
A_values = lib_refine.checking_A_record(r_serv2, domains_list)
|
A_values = lib_refine.checking_A_record(r_serv2,
|
||||||
|
domains_list)
|
||||||
|
|
||||||
if A_values[0] >= 1:
|
if A_values[0] >= 1:
|
||||||
PST.__setattr__(channel, A_values)
|
PST.__setattr__(channel, A_values)
|
||||||
PST.save_attribute_redis(r_serv1, channel, (A_values[0], list(A_values[1])))
|
PST.save_attribute_redis(r_serv1, channel,
|
||||||
|
(A_values[0],
|
||||||
|
list(A_values[1])))
|
||||||
|
|
||||||
pprint.pprint(A_values)
|
pprint.pprint(A_values)
|
||||||
publisher.info('{0};{1};{2};{3};{4}'.format("Url", PST.p_source, PST.p_date, PST.p_name, "Checked " + str(A_values[0]) + " URL"))
|
publisher.info('Url;{};{};{};Checked {} URL'.format(
|
||||||
|
PST.p_source, PST.p_date, PST.p_name, A_values[0]))
|
||||||
prec_filename = filename
|
prec_filename = filename
|
||||||
|
|
||||||
else:
|
else:
|
||||||
if r_serv.sismember("SHUTDOWN_FLAGS", "Urls"):
|
if h.redis_queue_shutdown():
|
||||||
r_serv.srem("SHUTDOWN_FLAGS", "Urls")
|
|
||||||
print "Shutdown Flag Up: Terminating"
|
print "Shutdown Flag Up: Terminating"
|
||||||
publisher.warning("Shutdown Flag Up: Terminating.")
|
publisher.warning("Shutdown Flag Up: Terminating.")
|
||||||
break
|
break
|
||||||
publisher.debug("Script url is Idling 10s")
|
publisher.debug("Script url is Idling 10s")
|
||||||
time.sleep(10)
|
time.sleep(10)
|
||||||
|
|
||||||
message = sub.get_msg_from_queue(r_serv)
|
message = h.redis_rpop()
|
||||||
except dns.exception.Timeout:
|
except dns.exception.Timeout:
|
||||||
print "dns.exception.Timeout", A_values
|
print "dns.exception.Timeout", A_values
|
||||||
pass
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
import ConfigParser
|
import ConfigParser
|
||||||
import argparse
|
import argparse
|
||||||
import gzip
|
import gzip
|
||||||
|
import os
|
||||||
|
|
||||||
|
|
||||||
def readdoc(path=None):
|
def readdoc(path=None):
|
||||||
|
@ -21,7 +22,7 @@ def readdoc(path=None):
|
||||||
f = gzip.open(path, 'r')
|
f = gzip.open(path, 'r')
|
||||||
return f.read()
|
return f.read()
|
||||||
|
|
||||||
configfile = '../packages/config.cfg'
|
configfile = os.path.join(os.environ('AIL_BIN'), 'packages/config.cfg')
|
||||||
cfg = ConfigParser.ConfigParser()
|
cfg = ConfigParser.ConfigParser()
|
||||||
cfg.read(configfile)
|
cfg.read(configfile)
|
||||||
|
|
||||||
|
|
|
@ -1,133 +0,0 @@
|
||||||
#!/usr/bin/python2.7
|
|
||||||
"""
|
|
||||||
The ``ZMQ PubSub`` Modules
|
|
||||||
==========================
|
|
||||||
|
|
||||||
"""
|
|
||||||
|
|
||||||
import zmq
|
|
||||||
|
|
||||||
|
|
||||||
class PubSub(object):
|
|
||||||
"""
|
|
||||||
The PubSub class is a ``Virtual Class`` which regroup the shared attribute
|
|
||||||
of a Publisher ZeroMQ and a Subcriber ZeroMQ
|
|
||||||
|
|
||||||
:param config: -- (ConfigParser) Handle on the parsed config file
|
|
||||||
:param log_channel: -- (str) The channel used as a log channel
|
|
||||||
:param ps_name: -- (str) The "ID" of the Publisher/Subcriber
|
|
||||||
|
|
||||||
:return: PubSub Object
|
|
||||||
|
|
||||||
..note:: The ps_name was implemented to separate subscriber queues in redis
|
|
||||||
when they are listening on a same "stream"
|
|
||||||
..seealso:: Method of the ZMQSub class
|
|
||||||
|
|
||||||
..todo:: Create Implementing a log channel as an attribute of this virtual class.
|
|
||||||
|
|
||||||
"""
|
|
||||||
def __init__(self, config, log_channel, ps_name):
|
|
||||||
self._ps_name = ps_name
|
|
||||||
self._config_parser = config
|
|
||||||
|
|
||||||
self._context_zmq = zmq.Context()
|
|
||||||
|
|
||||||
|
|
||||||
class ZMQPub(PubSub):
|
|
||||||
"""
|
|
||||||
This class create a ZMQ Publisher which is able to send_message to a choosen socket.
|
|
||||||
|
|
||||||
:param pub_config_section: -- (str) The name of the section in the config file to get the settings
|
|
||||||
|
|
||||||
:return: ZMQPub Object
|
|
||||||
|
|
||||||
:Example:
|
|
||||||
Extract of the config file:
|
|
||||||
[PubSub_Categ]
|
|
||||||
adress = tcp://127.0.0.1:5003
|
|
||||||
|
|
||||||
Creating the object and sending message:
|
|
||||||
MyPublisher = ZMQPub('./packages/config.cfg', 'PubSub_Categ', 'pubcateg')
|
|
||||||
|
|
||||||
msg = "categ1"+" "+"Im the data sent on the categ1 channel"
|
|
||||||
MyPublisher.send_message(msg)
|
|
||||||
|
|
||||||
..note:: The ps_name attribute for a publisher is "optionnal" but still required to be
|
|
||||||
instantiated correctly.
|
|
||||||
|
|
||||||
"""
|
|
||||||
def __init__(self, config, pub_config_section, ps_name):
|
|
||||||
super(ZMQPub, self).__init__(config, "Default", ps_name)
|
|
||||||
|
|
||||||
self._pubsocket = self._context_zmq.socket(zmq.PUB)
|
|
||||||
self._pub_adress = self._config_parser.get(pub_config_section, "adress")
|
|
||||||
|
|
||||||
self._pubsocket.bind(self._pub_adress)
|
|
||||||
|
|
||||||
def send_message(self, message):
|
|
||||||
"""Send a message throught the publisher socket"""
|
|
||||||
self._pubsocket.send(message)
|
|
||||||
|
|
||||||
|
|
||||||
class ZMQSub(PubSub):
|
|
||||||
"""
|
|
||||||
This class create a ZMQ Subcriber which is able to receive message directly or
|
|
||||||
throught redis as a buffer.
|
|
||||||
|
|
||||||
The redis buffer is usefull when the subcriber do a time consuming job which is
|
|
||||||
desynchronising it from the stream of data received.
|
|
||||||
The redis buffer ensures that no data will be loss waiting to be processed.
|
|
||||||
|
|
||||||
:param sub_config_section: -- (str) The name of the section in the config file to get the settings
|
|
||||||
:param channel: -- (str) The listening channel of the Subcriber.
|
|
||||||
|
|
||||||
:return: ZMQSub Object
|
|
||||||
|
|
||||||
:Example:
|
|
||||||
Extract of the config file:
|
|
||||||
[PubSub_Global]
|
|
||||||
adress = tcp://127.0.0.1:5000
|
|
||||||
channel = filelist
|
|
||||||
|
|
||||||
Creating the object and receiving data + pushing to redis (redis buffering):
|
|
||||||
|
|
||||||
r_serv = redis.StrictRedis(
|
|
||||||
host = 127.0.0.1,
|
|
||||||
port = 6387,
|
|
||||||
db = 0)
|
|
||||||
|
|
||||||
channel = cfg.get("PubSub_Global", "channel")
|
|
||||||
MySubscriber = ZMQSub('./packages/config.cfg',"PubSub_Global", channel, "duplicate")
|
|
||||||
|
|
||||||
while True:
|
|
||||||
MySubscriber.get_and_lpush(r_serv)
|
|
||||||
|
|
||||||
|
|
||||||
Inside another script use this line to retrive the data from redis.
|
|
||||||
...
|
|
||||||
while True:
|
|
||||||
MySubscriber.get_msg_from_queue(r_serv)
|
|
||||||
...
|
|
||||||
|
|
||||||
..note:: If you don't want any redis buffering simply use the "get_message" method
|
|
||||||
|
|
||||||
"""
|
|
||||||
def __init__(self, config, sub_config_section, channel, ps_name):
|
|
||||||
super(ZMQSub, self).__init__(config, "Default", ps_name)
|
|
||||||
|
|
||||||
self._subsocket = self._context_zmq.socket(zmq.SUB)
|
|
||||||
self._sub_adress = self._config_parser.get(sub_config_section, "adress")
|
|
||||||
|
|
||||||
self._subsocket.connect(self._sub_adress)
|
|
||||||
|
|
||||||
self._channel = channel
|
|
||||||
self._subsocket.setsockopt(zmq.SUBSCRIBE, self._channel)
|
|
||||||
|
|
||||||
def get_msg_from_queue(self, r_serv):
|
|
||||||
"""
|
|
||||||
Get the first sent message from a Redis List
|
|
||||||
|
|
||||||
:return: (str) Message from Publisher
|
|
||||||
|
|
||||||
"""
|
|
||||||
return r_serv.rpop(self._channel+self._ps_name)
|
|
|
@ -45,7 +45,7 @@ def create_dirfile(r_serv, directory, overwrite):
|
||||||
r_serv.delete("filelist")
|
r_serv.delete("filelist")
|
||||||
|
|
||||||
for x in listdirectory(directory):
|
for x in listdirectory(directory):
|
||||||
r_serv.rpush("filelist", x)
|
r_serv.lpush("filelist", x)
|
||||||
|
|
||||||
publisher.info("The list was overwritten")
|
publisher.info("The list was overwritten")
|
||||||
|
|
||||||
|
@ -53,13 +53,13 @@ def create_dirfile(r_serv, directory, overwrite):
|
||||||
if r_serv.llen("filelist") == 0:
|
if r_serv.llen("filelist") == 0:
|
||||||
|
|
||||||
for x in listdirectory(directory):
|
for x in listdirectory(directory):
|
||||||
r_serv.rpush("filelist", x)
|
r_serv.lpush("filelist", x)
|
||||||
|
|
||||||
publisher.info("New list created")
|
publisher.info("New list created")
|
||||||
else:
|
else:
|
||||||
|
|
||||||
for x in listdirectory(directory):
|
for x in listdirectory(directory):
|
||||||
r_serv.rpush("filelist", x)
|
r_serv.lpush("filelist", x)
|
||||||
|
|
||||||
publisher.info("The list was updated with new elements")
|
publisher.info("The list was updated with new elements")
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue