mirror of https://github.com/CIRCL/lookyloo
fix: missing future
parent
ee1ad48b25
commit
a26e80b093
|
@ -1,5 +1,7 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
|
||||
from io import BytesIO
|
||||
|
@ -53,22 +55,22 @@ class MISPs(Mapping, AbstractModule): # type: ignore[type-arg]
|
|||
|
||||
return True
|
||||
|
||||
def __getitem__(self, name: str) -> 'MISP':
|
||||
def __getitem__(self, name: str) -> MISP:
|
||||
return self.__misps[name]
|
||||
|
||||
def __iter__(self) -> Iterator[dict[str, 'MISP']]:
|
||||
def __iter__(self) -> Iterator[dict[str, MISP]]:
|
||||
return iter(self.__misps)
|
||||
|
||||
def __len__(self) -> int:
|
||||
return len(self.__misps)
|
||||
|
||||
@property
|
||||
def default_misp(self) -> 'MISP':
|
||||
def default_misp(self) -> MISP:
|
||||
return self.__misps[self.default_instance]
|
||||
|
||||
def export(self, cache: 'CaptureCache', is_public_instance: bool=False,
|
||||
submitted_filename: Optional[str]=None,
|
||||
submitted_file: Optional[BytesIO]=None) -> MISPEvent:
|
||||
def export(self, cache: CaptureCache, is_public_instance: bool=False,
|
||||
submitted_filename: str | None=None,
|
||||
submitted_file: BytesIO | None=None) -> MISPEvent:
|
||||
'''Export a capture in MISP format. You can POST the return of this method
|
||||
directly to a MISP instance and it will create an event.'''
|
||||
public_domain = get_config('generic', 'public_domain')
|
||||
|
@ -103,7 +105,7 @@ class MISPs(Mapping, AbstractModule): # type: ignore[type-arg]
|
|||
lookyloo_link.distribution = 0
|
||||
initial_obj.add_reference(lookyloo_link, 'captured-by', 'Capture on lookyloo')
|
||||
|
||||
redirects: List[URLObject] = []
|
||||
redirects: list[URLObject] = []
|
||||
for nb, url in enumerate(cache.redirects):
|
||||
if url == cache.url:
|
||||
continue
|
||||
|
@ -163,7 +165,7 @@ class MISP(AbstractModule):
|
|||
self.enable_push = bool(self.config.get('enable_push', False))
|
||||
self.allow_auto_trigger = bool(self.config.get('allow_auto_trigger', False))
|
||||
|
||||
self.default_tags: List[str] = self.config.get('default_tags') # type: ignore
|
||||
self.default_tags: list[str] = self.config.get('default_tags') # type: ignore
|
||||
self.auto_publish = bool(self.config.get('auto_publish', False))
|
||||
self.storage_dir_misp = get_homedir() / 'misp'
|
||||
self.storage_dir_misp.mkdir(parents=True, exist_ok=True)
|
||||
|
@ -173,7 +175,7 @@ class MISP(AbstractModule):
|
|||
def get_fav_tags(self) -> dict[Any, Any] | list[MISPTag]:
|
||||
return self.client.tags(pythonify=True, favouritesOnly=1)
|
||||
|
||||
def _prepare_push(self, to_push: Union[List[MISPEvent], MISPEvent], allow_duplicates: bool=False, auto_publish: Optional[bool]=False) -> Union[List[MISPEvent], Dict[str, str]]:
|
||||
def _prepare_push(self, to_push: list[MISPEvent] | MISPEvent, allow_duplicates: bool=False, auto_publish: bool | None=False) -> list[MISPEvent] | dict[str, str]:
|
||||
'''Adds the pre-configured information as required by the instance.
|
||||
If duplicates aren't allowed, they will be automatically skiped and the
|
||||
extends_uuid key in the next element in the list updated'''
|
||||
|
@ -200,7 +202,7 @@ class MISP(AbstractModule):
|
|||
events_to_push.append(event)
|
||||
return events_to_push
|
||||
|
||||
def push(self, to_push: Union[List[MISPEvent], MISPEvent], allow_duplicates: bool=False, auto_publish: Optional[bool]=None) -> Union[List[MISPEvent], Dict[Any, Any]]:
|
||||
def push(self, to_push: list[MISPEvent] | MISPEvent, allow_duplicates: bool=False, auto_publish: bool | None=None) -> list[MISPEvent] | dict[Any, Any]:
|
||||
if auto_publish is None:
|
||||
auto_publish = self.auto_publish
|
||||
if self.available and self.enable_push:
|
||||
|
@ -233,14 +235,14 @@ class MISP(AbstractModule):
|
|||
else:
|
||||
return {'error': 'Module not available or push not enabled.'}
|
||||
|
||||
def get_existing_event_url(self, permaurl: str) -> Optional[str]:
|
||||
def get_existing_event_url(self, permaurl: str) -> str | None:
|
||||
attributes = self.client.search('attributes', value=permaurl, limit=1, page=1, pythonify=True)
|
||||
if not attributes or not isinstance(attributes[0], MISPAttribute):
|
||||
return None
|
||||
url = f'{self.client.root_url}/events/{attributes[0].event_id}'
|
||||
return url
|
||||
|
||||
def get_existing_event(self, permaurl: str) -> Optional[MISPEvent]:
|
||||
def get_existing_event(self, permaurl: str) -> MISPEvent | None:
|
||||
attributes = self.client.search('attributes', value=permaurl, limit=1, page=1, pythonify=True)
|
||||
if not attributes or not isinstance(attributes[0], MISPAttribute):
|
||||
return None
|
||||
|
@ -249,7 +251,7 @@ class MISP(AbstractModule):
|
|||
return event
|
||||
return None
|
||||
|
||||
def lookup(self, node: URLNode, hostnode: HostNode) -> Union[Dict[str, Set[str]], Dict[str, Any]]:
|
||||
def lookup(self, node: URLNode, hostnode: HostNode) -> dict[str, set[str]] | dict[str, Any]:
|
||||
if self.available and self.enable_lookup:
|
||||
tld = self.psl.publicsuffix(hostnode.name)
|
||||
domain = re.sub(f'.{tld}$', '', hostnode.name).split('.')[-1]
|
||||
|
@ -265,7 +267,7 @@ class MISP(AbstractModule):
|
|||
if attributes := self.client.search(controller='attributes', value=to_lookup,
|
||||
enforce_warninglist=True, pythonify=True):
|
||||
if isinstance(attributes, list):
|
||||
to_return: Dict[str, Set[str]] = defaultdict(set)
|
||||
to_return: dict[str, set[str]] = defaultdict(set)
|
||||
# NOTE: We have MISPAttribute in that list
|
||||
for a in attributes:
|
||||
to_return[a.event_id].add(a.value) # type: ignore
|
||||
|
|
Loading…
Reference in New Issue