fix: [ZMQ Feeder] performance: replace zmq recv NOBLOCK by Poller

pull/486/head
Terrtia 2020-02-27 13:23:40 +01:00
parent 40b853cbe3
commit 998f8cc8e1
No known key found for this signature in database
GPG Key ID: 1E1B1F50D84613D0
3 changed files with 17 additions and 13 deletions

View File

@ -95,7 +95,7 @@ if __name__ == '__main__':
#publisher.info(to_print) #publisher.info(to_print)
time_1 = time.time() time_1 = time.time()
processed_paste = 0 processed_paste = 0
time.sleep(1) time.sleep(0.5)
continue continue
# remove PASTES_FOLDER from item path (crawled item + submited) # remove PASTES_FOLDER from item path (crawled item + submited)

View File

@ -53,13 +53,14 @@ class PubSub(object): ## TODO: remove config, use ConfigLoader by default
self.zmq_sub = True self.zmq_sub = True
context = zmq.Context() context = zmq.Context()
# Get all feeds
self.subscribers = [] self.subscribers = []
addresses = self.config.get(conn_name, 'address') addresses = self.config.get(conn_name, 'address')
for address in addresses.split(','): for address in addresses.split(','):
new_sub = context.socket(zmq.SUB) subscriber = context.socket(zmq.SUB)
new_sub.connect(address) subscriber.connect(address)
new_sub.setsockopt_string(zmq.SUBSCRIBE, channel) subscriber.setsockopt_string(zmq.SUBSCRIBE, channel)
self.subscribers.append(new_sub) self.subscribers.append(subscriber)
def setup_publish(self, conn_name): def setup_publish(self, conn_name):
if self.config.has_section(conn_name): if self.config.has_section(conn_name):
@ -96,14 +97,18 @@ class PubSub(object): ## TODO: remove config, use ConfigLoader by default
if msg.get('data', None) is not None: if msg.get('data', None) is not None:
yield msg['data'] yield msg['data']
elif self.zmq_sub: elif self.zmq_sub:
# Initialize poll set
poller = zmq.Poller()
for subscriber in self.subscribers:
poller.register(subscriber, zmq.POLLIN)
while True: while True:
for sub in self.subscribers: socks = dict(poller.poll())
try:
msg = sub.recv(zmq.NOBLOCK) for subscriber in self.subscribers:
yield msg.split(b" ", 1)[1] if subscriber in socks:
except zmq.error.Again as e: message = subscriber.recv()
time.sleep(0.2) yield message.split(b' ', 1)[1]
pass
else: else:
raise Exception('No subscribe function defined') raise Exception('No subscribe function defined')

View File

@ -186,7 +186,6 @@ if __name__ == '__main__':
print("Empty Paste: not processed") print("Empty Paste: not processed")
publisher.debug("Empty Paste: {0} not processed".format(message)) publisher.debug("Empty Paste: {0} not processed".format(message))
else: else:
print("Empty Queues: Waiting...")
if int(time.time() - time_1) > refresh_time: if int(time.time() - time_1) > refresh_time:
# update internal feeder # update internal feeder