chg: [Language] replace pycld3 by gcld3 + clean text before language detection

dev
terrtia 2024-01-15 14:17:15 +01:00
parent d6d67f6a4c
commit 1c46bb4296
No known key found for this signature in database
GPG Key ID: 1E1B1F50D84613D0
5 changed files with 123 additions and 106 deletions

View File

@ -2,9 +2,11 @@
# -*-coding:UTF-8 -*
import os
import re
import sys
import html2text
import cld3
import gcld3
from libretranslatepy import LibreTranslateAPI
sys.path.append(os.environ['AIL_BIN'])
@ -259,6 +261,91 @@ class LanguageDetector:
def get_translator_instance():
return TRANSLATOR_URL
def _get_html2text(content, ignore_links=False):
h = html2text.HTML2Text()
h.ignore_links = ignore_links
h.ignore_images = ignore_links
return h.handle(content)
def _clean_text_to_translate(content, html=False, keys_blocks=True):
if html:
content = _get_html2text(content, ignore_links=True)
# REMOVE URLS
regex = r'\b(?:http://|https://)?(?:[a-zA-Z\d-]{,63}(?:\.[a-zA-Z\d-]{,63})+)(?:\:[0-9]+)*(?:/(?:$|[a-zA-Z0-9\.\,\?\'\\\+&%\$#\=~_\-]+))*\b'
url_regex = re.compile(regex)
urls = url_regex.findall(content)
urls = sorted(urls, key=len, reverse=True)
for url in urls:
content = content.replace(url, '')
# REMOVE PGP Blocks
if keys_blocks:
regex_pgp_public_blocs = r'-----BEGIN PGP PUBLIC KEY BLOCK-----[\s\S]+?-----END PGP PUBLIC KEY BLOCK-----'
regex_pgp_signature = r'-----BEGIN PGP SIGNATURE-----[\s\S]+?-----END PGP SIGNATURE-----'
regex_pgp_message = r'-----BEGIN PGP MESSAGE-----[\s\S]+?-----END PGP MESSAGE-----'
re.compile(regex_pgp_public_blocs)
re.compile(regex_pgp_signature)
re.compile(regex_pgp_message)
res = re.findall(regex_pgp_public_blocs, content)
for it in res:
content = content.replace(it, '')
res = re.findall(regex_pgp_signature, content)
for it in res:
content = content.replace(it, '')
res = re.findall(regex_pgp_message, content)
for it in res:
content = content.replace(it, '')
return content
class LanguagesDetector:
def __init__(self, nb_langs=3, min_proportion=0.2, min_probability=0.7, min_len=0):
self.lt = LibreTranslateAPI(get_translator_instance())
try:
self.lt.languages()
except Exception:
self.lt = None
self.detector = gcld3.NNetLanguageIdentifier(min_num_bytes=0, max_num_bytes=1000)
self.nb_langs = nb_langs
self.min_proportion = min_proportion
self.min_probability = min_probability
self.min_len = min_len
def detect_gcld3(self, content):
languages = []
content = _clean_text_to_translate(content, html=True)
if self.min_len > 0:
if len(content) < self.min_len:
return languages
for lang in self.detector.FindTopNMostFreqLangs(content, num_langs=self.nb_langs):
if lang.proportion >= self.min_proportion and lang.probability >= self.min_probability and lang.is_reliable:
languages.append(lang.language)
return languages
def detect_libretranslate(self, content):
languages = []
try:
# [{"confidence": 0.6, "language": "en"}]
resp = self.lt.detect(content)
except: # TODO ERROR MESSAGE
resp = []
if resp:
for language in resp:
if language.confidence >= self.min_probability:
languages.append(language)
return languages
def detect(self, content):
# gcld3
if len(content) >= 200 or not self.lt:
language = self.detect_gcld3(content)
# libretranslate
else:
language = self.detect_libretranslate(content)
return language
class LanguageTranslator:
def __init__(self):
@ -273,9 +360,15 @@ class LanguageTranslator:
print(e)
return languages
def detect_cld3(self, content):
for lang in cld3.get_frequent_languages(content, num_langs=1):
return lang.language
def detect_gcld3(self, content):
content = _clean_text_to_translate(content, html=True)
detector = gcld3.NNetLanguageIdentifier(min_num_bytes=0, max_num_bytes=1000)
lang = detector.FindLanguage(content)
# print(lang.language)
# print(lang.is_reliable)
# print(lang.proportion)
# print(lang.probability)
return lang.language
def detect_libretranslate(self, content):
try:
@ -285,10 +378,10 @@ class LanguageTranslator:
if language:
return language[0].get('language')
def detect(self, content): # TODO replace by gcld3
# cld3
def detect(self, content):
# gcld3
if len(content) >= 200:
language = self.detect_cld3(content)
language = self.detect_gcld3(content)
# libretranslate
else:
language = self.detect_libretranslate(content)
@ -313,18 +406,22 @@ class LanguageTranslator:
translation = None
return translation
try:
LIST_LANGUAGES = LanguageTranslator().languages()
except Exception as e:
print(e)
LIST_LANGUAGES = []
LIST_LANGUAGES = []
def get_translation_languages():
global LIST_LANGUAGES
if not LIST_LANGUAGES:
try:
LIST_LANGUAGES = LanguageTranslator().languages()
except Exception as e:
print(e)
LIST_LANGUAGES = []
return LIST_LANGUAGES
if __name__ == '__main__':
t_content = ''
# t_content = ''
langg = LanguageTranslator()
# langg = LanguagesDetector()
# lang.translate(t_content, source='ru')
langg.languages()

View File

@ -7,7 +7,6 @@ import magic
import os
import re
import sys
import cld3
import html2text
from io import BytesIO
@ -23,6 +22,7 @@ from lib.ail_core import get_ail_uuid, rreplace
from lib.objects.abstract_object import AbstractObject
from lib.ConfigLoader import ConfigLoader
from lib import item_basic
from lib.Language import LanguagesDetector
from lib.data_retention_engine import update_obj_date, get_obj_date_first
from packages import Date
@ -338,21 +338,10 @@ class Item(AbstractObject):
nb_line += 1
return {'nb': nb_line, 'max_length': max_length}
# TODO RENAME ME
def get_languages(self, min_len=600, num_langs=3, min_proportion=0.2, min_probability=0.7):
all_languages = []
## CLEAN CONTENT ##
content = self.get_html2text_content(ignore_links=True)
content = remove_all_urls_from_content(self.id, item_content=content) ##########################################
# REMOVE USELESS SPACE
content = ' '.join(content.split())
#- CLEAN CONTENT -#
#print(content)
#print(len(content))
if len(content) >= min_len: # # TODO: # FIXME: check num langs limit
for lang in cld3.get_frequent_languages(content, num_langs=num_langs):
if lang.proportion >= min_proportion and lang.probability >= min_probability and lang.is_reliable:
all_languages.append(lang)
return all_languages
ld = LanguagesDetector(nb_langs=num_langs, min_proportion=min_proportion, min_probability=min_probability, min_len=min_len)
return ld.detect(self.get_content())
def get_mimetype(self, content=None):
if not content:
@ -677,24 +666,6 @@ def remove_all_urls_from_content(item_id, item_content=None):
return item_content
def get_item_languages(item_id, min_len=600, num_langs=3, min_proportion=0.2, min_probability=0.7):
all_languages = []
## CLEAN CONTENT ##
content = get_item_content_html2text(item_id, ignore_links=True)
content = remove_all_urls_from_content(item_id, item_content=content)
# REMOVE USELESS SPACE
content = ' '.join(content.split())
#- CLEAN CONTENT -#
#print(content)
#print(len(content))
if len(content) >= min_len:
for lang in cld3.get_frequent_languages(content, num_langs=num_langs):
if lang.proportion >= min_proportion and lang.probability >= min_probability and lang.is_reliable:
all_languages.append(lang)
return all_languages
# API
# def get_item(request_dict):
@ -945,13 +916,13 @@ def create_item(obj_id, obj_metadata, io_content):
# delete_item(child_id)
if __name__ == '__main__':
# if __name__ == '__main__':
# content = 'test file content'
# duplicates = {'tests/2020/01/02/test.gz': [{'algo':'ssdeep', 'similarity':75}, {'algo':'tlsh', 'similarity':45}]}
#
# item = Item('tests/2020/01/02/test_save.gz')
# item = Item('tests/2020/01/02/test_save.gz')
# item.create(content, _save=False)
filters = {'date_from': '20230101', 'date_to': '20230501', 'sources': ['crawled', 'submitted'], 'start': ':submitted/2023/04/28/submitted_2b3dd861-a75d-48e4-8cec-6108d41450da.gz'}
gen = get_all_items_objects(filters=filters)
for obj_id in gen:
print(obj_id.id)
# filters = {'date_from': '20230101', 'date_to': '20230501', 'sources': ['crawled', 'submitted'], 'start': ':submitted/2023/04/28/submitted_2b3dd861-a75d-48e4-8cec-6108d41450da.gz'}
# gen = get_all_items_objects(filters=filters)
# for obj_id in gen:
# print(obj_id.id)

View File

@ -4,8 +4,6 @@
import os
import re
import sys
import cld3
import html2text
from datetime import datetime
@ -184,14 +182,6 @@ class Message(AbstractObject):
"""
return self._set_field('translated', translation) # translation by hash ??? -> avoid translating multiple time
def get_html2text_content(self, content=None, ignore_links=False):
if not content:
content = self.get_content()
h = html2text.HTML2Text()
h.ignore_links = ignore_links
h.ignore_images = ignore_links
return h.handle(content)
# def get_ail_2_ail_payload(self):
# payload = {'raw': self.get_gzip_content(b64=True)}
# return payload
@ -287,48 +277,6 @@ class Message(AbstractObject):
# meta['encoding'] = None
return meta
def _languages_cleaner(self, content=None):
if not content:
content = self.get_content()
# REMOVE URLS
regex = r'\b(?:http://|https://)?(?:[a-zA-Z\d-]{,63}(?:\.[a-zA-Z\d-]{,63})+)(?:\:[0-9]+)*(?:/(?:$|[a-zA-Z0-9\.\,\?\'\\\+&%\$#\=~_\-]+))*\b'
url_regex = re.compile(regex)
urls = url_regex.findall(content)
urls = sorted(urls, key=len, reverse=True)
for url in urls:
content = content.replace(url, '')
# REMOVE PGP Blocks
regex_pgp_public_blocs = r'-----BEGIN PGP PUBLIC KEY BLOCK-----[\s\S]+?-----END PGP PUBLIC KEY BLOCK-----'
regex_pgp_signature = r'-----BEGIN PGP SIGNATURE-----[\s\S]+?-----END PGP SIGNATURE-----'
regex_pgp_message = r'-----BEGIN PGP MESSAGE-----[\s\S]+?-----END PGP MESSAGE-----'
re.compile(regex_pgp_public_blocs)
re.compile(regex_pgp_signature)
re.compile(regex_pgp_message)
res = re.findall(regex_pgp_public_blocs, content)
for it in res:
content = content.replace(it, '')
res = re.findall(regex_pgp_signature, content)
for it in res:
content = content.replace(it, '')
res = re.findall(regex_pgp_message, content)
for it in res:
content = content.replace(it, '')
return content
def detect_languages(self, min_len=600, num_langs=3, min_proportion=0.2, min_probability=0.7):
languages = []
## CLEAN CONTENT ##
content = self.get_html2text_content(ignore_links=True)
content = self._languages_cleaner(content=content)
# REMOVE USELESS SPACE
content = ' '.join(content.split())
# - CLEAN CONTENT - #
if len(content) >= min_len:
for lang in cld3.get_frequent_languages(content, num_langs=num_langs):
if lang.proportion >= min_proportion and lang.probability >= min_probability and lang.is_reliable:
languages.append(lang)
return languages
# def translate(self, content=None): # TODO translation plugin
# # TODO get text language
# if not content:

View File

@ -31,7 +31,8 @@ class Languages(AbstractModule):
if obj.is_crawled():
domain = Domain(obj.get_domain())
for lang in obj.get_languages(min_probability=0.8):
domain.add_language(lang.language)
print(lang)
domain.add_language(lang)
if __name__ == '__main__':

View File

@ -42,7 +42,7 @@ scrapy>2.0.0
scrapy-splash>=0.7.2
# Languages
pycld3>0.20
gcld3
libretranslatepy
#Graph