158 lines
5.8 KiB
Python
158 lines
5.8 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import json
|
|
from json import JSONEncoder
|
|
import sys
|
|
import collections
|
|
from glob import glob
|
|
from ipaddress import ip_address, ip_network
|
|
from pathlib import Path
|
|
|
|
|
|
try:
|
|
import jsonschema
|
|
HAS_JSONSCHEMA = True
|
|
except ImportError:
|
|
HAS_JSONSCHEMA = False
|
|
|
|
|
|
class EncodeWarningList(JSONEncoder):
|
|
def default(self, obj):
|
|
if isinstance(obj, WarningList):
|
|
return obj.to_dict()
|
|
return JSONEncoder.default(self, obj)
|
|
|
|
|
|
class PyMISPWarningListsError(Exception):
|
|
def __init__(self, message):
|
|
super(PyMISPWarningListsError, self).__init__(message)
|
|
self.message = message
|
|
|
|
|
|
class WarningList():
|
|
|
|
expected_types = ['string', 'substring', 'hostname', 'cidr', 'regex']
|
|
|
|
def __init__(self, warninglist, slow_search=False):
|
|
self.warninglist = warninglist
|
|
self.list = self.warninglist['list']
|
|
self.description = self.warninglist['description']
|
|
self.version = int(self.warninglist['version'])
|
|
self.name = self.warninglist['name']
|
|
if self.warninglist['type'] not in self.expected_types:
|
|
raise PyMISPWarningListsError(f'Unexpected type ({self.warninglist["type"]}), please update the expected_type list')
|
|
self.type = self.warninglist['type']
|
|
if self.warninglist.get('matching_attributes'):
|
|
self.matching_attributes = self.warninglist['matching_attributes']
|
|
|
|
self.slow_search = slow_search
|
|
self._network_objects = []
|
|
|
|
if self.slow_search and self.type == 'cidr':
|
|
self._network_objects = self._network_index()
|
|
# If network objects is empty, reverting to default anyway
|
|
if not self._network_objects:
|
|
self.slow_search = False
|
|
|
|
def __repr__(self):
|
|
return f'<{self.__class__.__name__}(type="{self.name}", version="{self.version}", description="{self.description}")'
|
|
|
|
def __contains__(self, value):
|
|
if self.slow_search:
|
|
return self._slow_search(value)
|
|
return self._fast_search(value)
|
|
|
|
def to_dict(self):
|
|
to_return = {'list': [str(e) for e in self.list], 'name': self.name,
|
|
'description': self.description, 'version': self.version,
|
|
'type': self.type}
|
|
if hasattr(self, 'matching_attributes'):
|
|
to_return['matching_attributes'] = self.matching_attributes
|
|
return to_return
|
|
|
|
def to_json(self):
|
|
return json.dumps(self, cls=EncodeWarningList)
|
|
|
|
def _fast_search(self, value):
|
|
return value in self.list
|
|
|
|
def _network_index(self):
|
|
to_return = []
|
|
for entry in self.list:
|
|
try:
|
|
# Try if the entry is a network bloc or an IP
|
|
to_return.append(ip_network(entry))
|
|
except ValueError:
|
|
pass
|
|
return to_return
|
|
|
|
def _slow_search(self, value):
|
|
if self.type == 'string':
|
|
# Exact match only, using fast search
|
|
return self._fast_search(value)
|
|
elif self.type == 'substring':
|
|
# Expected to match on a part of the value
|
|
# i.e.: value = 'blah.de' self.list == ['.fr', '.de']
|
|
return any(v in value for v in self.list)
|
|
elif self.type == 'hostname':
|
|
# Expected to match on hostnames in URLs (i.e. the search query is a URL)
|
|
# So we do a reverse search if any of the entries in the list are present in the URL
|
|
# i.e.: value = 'http://foo.blah.de/meh' self.list == ['blah.de', 'blah.fr']
|
|
return any(v in value for v in self.list)
|
|
elif self.type == 'cidr':
|
|
try:
|
|
value = ip_address(value)
|
|
except ValueError:
|
|
# The value to search isn't an IP address, falling back to default
|
|
return self._fast_search(value)
|
|
return any((value == obj or value in obj) for obj in self._network_objects)
|
|
|
|
|
|
class WarningLists(collections.Mapping):
|
|
|
|
def __init__(self, slow_search=False, lists=False):
|
|
"""Load all the warning lists from the package.
|
|
:slow_search: If true, uses the most appropriate search method. Can be slower. Default: exact match.
|
|
:lists: A list of warning lists (typically fetched from a MISP instance)
|
|
"""
|
|
if not lists:
|
|
lists = []
|
|
self.root_dir_warninglists = Path(sys.modules['pymispwarninglists'].__file__).parent / 'data' / 'misp-warninglists' / 'lists'
|
|
for warninglist_file in glob(str(self.root_dir_warninglists / '*' / 'list.json')):
|
|
with open(warninglist_file, 'r') as f:
|
|
lists.append(json.load(f))
|
|
if not lists:
|
|
raise PyMISPWarningListsError('Unable to load the lists. Do not forget to initialize the submodule (git submodule update --init).')
|
|
self.warninglists = {}
|
|
for warninglist in lists:
|
|
self.warninglists[warninglist['name']] = WarningList(warninglist, slow_search)
|
|
|
|
def validate_with_schema(self):
|
|
if not HAS_JSONSCHEMA:
|
|
raise ImportError('jsonschema is required: pip install jsonschema')
|
|
schema = Path(sys.modules['pymispwarninglists'].__file__).parent / 'data' / 'misp-warninglists' / 'schema.json'
|
|
with open(schema, 'r') as f:
|
|
loaded_schema = json.load(f)
|
|
for w in self.warninglists.values():
|
|
jsonschema.validate(w.warninglist, loaded_schema)
|
|
|
|
def __getitem__(self, name):
|
|
return self.warninglists[name]
|
|
|
|
def __iter__(self):
|
|
return iter(self.warninglists)
|
|
|
|
def search(self, value):
|
|
matches = []
|
|
for name, wl in self.warninglists.items():
|
|
if value in wl:
|
|
matches.append(wl)
|
|
return matches
|
|
|
|
def __len__(self):
|
|
return len(self.warninglists)
|
|
|
|
def get_loaded_lists(self):
|
|
return self.warninglists
|