chg: Improve search methods
parent
10f706cfcb
commit
482a9343d1
|
@ -32,12 +32,16 @@ class PyMISPWarningListsError(Exception):
|
||||||
|
|
||||||
class WarningList():
|
class WarningList():
|
||||||
|
|
||||||
|
expected_types = ['string', 'substring', 'hostname', 'cidr']
|
||||||
|
|
||||||
def __init__(self, warninglist, slow_search=False):
|
def __init__(self, warninglist, slow_search=False):
|
||||||
self.warninglist = warninglist
|
self.warninglist = warninglist
|
||||||
self.list = self.warninglist['list']
|
self.list = self.warninglist['list']
|
||||||
self.description = self.warninglist['description']
|
self.description = self.warninglist['description']
|
||||||
self.version = int(self.warninglist['version'])
|
self.version = int(self.warninglist['version'])
|
||||||
self.name = self.warninglist['name']
|
self.name = self.warninglist['name']
|
||||||
|
if self.warninglist['type'] not in self.expected_types:
|
||||||
|
raise Exception('Unexpected type, please update the expected_type list')
|
||||||
self.type = self.warninglist['type']
|
self.type = self.warninglist['type']
|
||||||
if self.warninglist.get('matching_attributes'):
|
if self.warninglist.get('matching_attributes'):
|
||||||
self.matching_attributes = self.warninglist['matching_attributes']
|
self.matching_attributes = self.warninglist['matching_attributes']
|
||||||
|
@ -45,8 +49,8 @@ class WarningList():
|
||||||
self.slow_search = slow_search
|
self.slow_search = slow_search
|
||||||
self._network_objects = []
|
self._network_objects = []
|
||||||
|
|
||||||
if self.slow_search:
|
if self.slow_search and self.type == 'cidr':
|
||||||
self._network_objects = self._slow_index()
|
self._network_objects = self._network_index()
|
||||||
# If network objects is empty, reverting to default anyway
|
# If network objects is empty, reverting to default anyway
|
||||||
if not self._network_objects:
|
if not self._network_objects:
|
||||||
self.slow_search = False
|
self.slow_search = False
|
||||||
|
@ -70,7 +74,7 @@ class WarningList():
|
||||||
def _fast_search(self, value):
|
def _fast_search(self, value):
|
||||||
return value in self.list
|
return value in self.list
|
||||||
|
|
||||||
def _slow_index(self):
|
def _network_index(self):
|
||||||
to_return = []
|
to_return = []
|
||||||
for entry in self.list:
|
for entry in self.list:
|
||||||
try:
|
try:
|
||||||
|
@ -81,21 +85,33 @@ class WarningList():
|
||||||
return to_return
|
return to_return
|
||||||
|
|
||||||
def _slow_search(self, value):
|
def _slow_search(self, value):
|
||||||
|
if self.type == 'string':
|
||||||
|
# Exact match only, using fast search
|
||||||
|
return self._fast_search(value)
|
||||||
|
elif self.type == 'substring':
|
||||||
|
# Expected to match on a part of the value
|
||||||
|
# i.e.: value = 'blah.de' self.list == ['.fr', '.de']
|
||||||
|
return any(v in value for v in self.list)
|
||||||
|
elif self.type == 'hostname':
|
||||||
|
# Expected to match on hostnames in URLs (i.e. the search query is a URL)
|
||||||
|
# So we do a reverse search if any of the entries in the list are present in the URL
|
||||||
|
# i.e.: value = 'http://foo.blah.de/meh' self.list == ['blah.de', 'blah.fr']
|
||||||
|
return any(v in value for v in self.list)
|
||||||
|
elif self.type == 'cidr':
|
||||||
try:
|
try:
|
||||||
value = ip_address(value)
|
value = ip_address(value)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
# The value to search isn't an IP address, falling back to default
|
# The value to search isn't an IP address, falling back to default
|
||||||
return self._fast_search(value)
|
return self._fast_search(value)
|
||||||
for obj in self._network_objects:
|
return any((value == obj or value in obj) for obj in self._network_objects)
|
||||||
if value == obj or value in obj:
|
|
||||||
return True
|
|
||||||
# If nothing has been found yet, fallback to default
|
|
||||||
return self._fast_search(value)
|
|
||||||
|
|
||||||
|
|
||||||
class WarningLists(collections.Mapping):
|
class WarningLists(collections.Mapping):
|
||||||
|
|
||||||
def __init__(self, slow_search=False):
|
def __init__(self, slow_search=False):
|
||||||
|
"""Load all the warning lists from the package.
|
||||||
|
:slow_search: If true, uses the most appropriate search method. Can be slower. Default: exact match.
|
||||||
|
"""
|
||||||
self.root_dir_warninglists = os.path.join(os.path.abspath(os.path.dirname(sys.modules['pymispwarninglists'].__file__)),
|
self.root_dir_warninglists = os.path.join(os.path.abspath(os.path.dirname(sys.modules['pymispwarninglists'].__file__)),
|
||||||
'data', 'misp-warninglists', 'lists')
|
'data', 'misp-warninglists', 'lists')
|
||||||
self.warninglists = {}
|
self.warninglists = {}
|
||||||
|
|
Loading…
Reference in New Issue