mirror of https://github.com/CIRCL/lookyloo
chg: Add body hash and domains in MISP lookup
parent
d4b71dcf4a
commit
1ae02e0dea
|
@ -5,6 +5,7 @@ import hashlib
|
|||
from urllib.parse import urlsplit
|
||||
from typing import List, Tuple, Set, Dict, Optional, Iterable
|
||||
from collections import defaultdict
|
||||
import re
|
||||
|
||||
from redis import Redis
|
||||
from har2tree import CrawledTree
|
||||
|
@ -69,7 +70,7 @@ class Indexing():
|
|||
for cn, cn_freq in self.cookies_names:
|
||||
for domain, d_freq in self.get_cookie_domains(cn):
|
||||
tld = psl.get_tld(domain)
|
||||
main_domain_part = domain.strip(f'.{tld}').split('.')[-1]
|
||||
main_domain_part = re.sub(f'.{tld}$', '', domain).split('.')[-1]
|
||||
pipeline.zincrby('aggregate_domains_cn', cn_freq, f'{main_domain_part}|{cn}')
|
||||
pipeline.zincrby('aggregate_cn_domains', d_freq, f'{cn}|{main_domain_part}')
|
||||
pipeline.execute()
|
||||
|
|
|
@ -10,8 +10,9 @@ from pathlib import Path
|
|||
import time
|
||||
import logging
|
||||
import socket
|
||||
import re
|
||||
|
||||
from .helpers import get_homedir, get_config
|
||||
from .helpers import get_homedir, get_config, get_public_suffix_list
|
||||
from .exceptions import ConfigError
|
||||
|
||||
import vt # type: ignore
|
||||
|
@ -55,6 +56,7 @@ class MISP():
|
|||
self.auto_publish = config.get('auto_publish')
|
||||
self.storage_dir_misp = get_homedir() / 'misp'
|
||||
self.storage_dir_misp.mkdir(parents=True, exist_ok=True)
|
||||
self.psl = get_public_suffix_list()
|
||||
|
||||
def get_fav_tags(self):
|
||||
return self.client.tags(pythonify=True, favouritesOnly=1)
|
||||
|
@ -124,9 +126,13 @@ class MISP():
|
|||
|
||||
def lookup(self, node: URLNode, hostnode: HostNode) -> Union[Dict[str, Set[str]], Dict[str, Any]]:
|
||||
if self.available and self.enable_lookup:
|
||||
to_lookup = [node.name, node.hostname] + hostnode.resolved_ips
|
||||
tld = self.psl.get_tld(hostnode.name)
|
||||
domain = re.sub(f'.{tld}$', '', hostnode.name).split('.')[-1]
|
||||
to_lookup = [node.name, hostnode.name, f'{domain}.{tld}'] + hostnode.resolved_ips
|
||||
if hasattr(hostnode, 'cnames'):
|
||||
to_lookup += hostnode.cnames
|
||||
if not node.empty_response:
|
||||
to_lookup.append(node.body_hash)
|
||||
if attributes := self.client.search(controller='attributes', value=to_lookup,
|
||||
enforce_warninglist=True, pythonify=True):
|
||||
if isinstance(attributes, list):
|
||||
|
|
Loading…
Reference in New Issue