mirror of https://github.com/MISP/PyMISP
Merge branch 'objects'
commit
b1989f16f2
|
@ -0,0 +1,3 @@
|
|||
[submodule "pymisp/data/misp-objects"]
|
||||
path = pymisp/data/misp-objects
|
||||
url = https://github.com/MISP/misp-objects
|
18
.travis.yml
18
.travis.yml
|
@ -2,22 +2,32 @@ language: python
|
|||
|
||||
cache: pip
|
||||
|
||||
addons:
|
||||
apt:
|
||||
sources: [ 'ubuntu-toolchain-r-test' ]
|
||||
packages:
|
||||
- libstdc++6
|
||||
- libfuzzy-dev
|
||||
|
||||
python:
|
||||
- "2.7"
|
||||
- "3.4"
|
||||
- "3.5"
|
||||
- "3.5-dev"
|
||||
- "3.6"
|
||||
- "3.6-dev"
|
||||
- "3.7-dev"
|
||||
- "nightly"
|
||||
|
||||
install:
|
||||
- pip install -U nose
|
||||
- pip install coveralls
|
||||
- pip install codecov
|
||||
- pip install requests-mock
|
||||
- pip install requests-mock pytest
|
||||
- pip install https://github.com/lief-project/packages/raw/lief-master-latest/pylief-0.7.0.dev.zip
|
||||
- pip install git+https://github.com/kbandla/pydeep.git
|
||||
- pip install python-magic
|
||||
- pip install .
|
||||
- pushd tests
|
||||
- git clone https://github.com/viper-framework/viper-test-files.git
|
||||
- popd
|
||||
|
||||
script:
|
||||
- nosetests --with-coverage --cover-package=pymisp tests/test_offline.py
|
||||
|
|
|
@ -0,0 +1,40 @@
|
|||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from pymisp import PyMISP
|
||||
from pymisp.tools import make_binary_objects
|
||||
import traceback
|
||||
from keys import misp_url, misp_key, misp_verifycert
|
||||
import glob
|
||||
import argparse
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser(description='Extract indicators out of binaries and add MISP objects to a MISP instance.')
|
||||
parser.add_argument("-e", "--event", required=True, help="Event ID to update.")
|
||||
parser.add_argument("-p", "--path", required=True, help="Path to process (expanded using glob).")
|
||||
args = parser.parse_args()
|
||||
|
||||
pymisp = PyMISP(misp_url, misp_key, misp_verifycert)
|
||||
|
||||
for f in glob.glob(args.path):
|
||||
try:
|
||||
fo, peo, seos = make_binary_objects(f)
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
|
||||
if seos:
|
||||
for s in seos:
|
||||
template_id = pymisp.get_object_template_id(s.template_uuid)
|
||||
r = pymisp.add_object(args.event, template_id, s)
|
||||
|
||||
if peo:
|
||||
template_id = pymisp.get_object_template_id(peo.template_uuid)
|
||||
r = pymisp.add_object(args.event, template_id, peo)
|
||||
for ref in peo.ObjectReference:
|
||||
r = pymisp.add_object_reference(ref)
|
||||
|
||||
if fo:
|
||||
template_id = pymisp.get_object_template_id(fo.template_uuid)
|
||||
response = pymisp.add_object(args.event, template_id, fo)
|
||||
for ref in fo.ObjectReference:
|
||||
r = pymisp.add_object_reference(ref)
|
|
@ -0,0 +1,67 @@
|
|||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import argparse
|
||||
import json
|
||||
|
||||
try:
|
||||
from pymisp import MISPEncode
|
||||
from pymisp.tools import make_binary_objects
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
|
||||
def check():
|
||||
missing_dependencies = {'pydeep': False, 'lief': False, 'magic': False, 'pymisp': False}
|
||||
try:
|
||||
import pymisp # noqa
|
||||
except ImportError:
|
||||
missing_dependencies['pymisp'] = 'Please install pydeep: pip install pymisp'
|
||||
try:
|
||||
import pydeep # noqa
|
||||
except ImportError:
|
||||
missing_dependencies['pydeep'] = 'Please install pydeep: pip install git+https://github.com/kbandla/pydeep.git'
|
||||
try:
|
||||
import lief # noqa
|
||||
except ImportError:
|
||||
missing_dependencies['lief'] = 'Please install lief, documentation here: https://github.com/lief-project/LIEF'
|
||||
try:
|
||||
import magic # noqa
|
||||
except ImportError:
|
||||
missing_dependencies['magic'] = 'Please install python-magic: pip install python-magic.'
|
||||
return json.dumps(missing_dependencies)
|
||||
|
||||
|
||||
def make_objects(path):
|
||||
to_return = {'objects': [], 'references': []}
|
||||
fo, peo, seos = make_binary_objects(path)
|
||||
|
||||
if seos:
|
||||
for s in seos:
|
||||
to_return['objects'].append(s)
|
||||
if s.ObjectReference:
|
||||
to_return['references'] += s.ObjectReference
|
||||
|
||||
if peo:
|
||||
to_return['objects'].append(peo)
|
||||
if peo.ObjectReference:
|
||||
to_return['references'] += peo.ObjectReference
|
||||
|
||||
if fo:
|
||||
to_return['objects'].append(fo)
|
||||
if fo.ObjectReference:
|
||||
to_return['references'] += fo.ObjectReference
|
||||
return json.dumps(to_return, cls=MISPEncode)
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser(description='Extract indicators out of binaries and returns MISP objects.')
|
||||
group = parser.add_mutually_exclusive_group()
|
||||
group.add_argument("-p", "--path", help="Path to process.")
|
||||
group.add_argument("-c", "--check", action='store_true', help="Check the dependencies.")
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.check:
|
||||
print(check())
|
||||
if args.path:
|
||||
obj = make_objects(args.path)
|
||||
print(obj)
|
|
@ -1,7 +1,12 @@
|
|||
__version__ = '2.4.79'
|
||||
__version__ = '2.4.80'
|
||||
|
||||
from .exceptions import PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey
|
||||
from .api import PyMISP
|
||||
from .mispevent import MISPEvent, MISPAttribute, EncodeUpdate, EncodeFull
|
||||
from .tools.neo4j import Neo4j
|
||||
from .tools import stix
|
||||
try:
|
||||
from .exceptions import PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey, InvalidMISPObject, UnknownMISPObjectTemplate # noqa
|
||||
from .api import PyMISP # noqa
|
||||
from .abstract import AbstractMISP, MISPEncode # noqa
|
||||
from .mispevent import MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject # noqa
|
||||
from .tools import AbstractMISPObjectGenerator # noqa
|
||||
from .tools import Neo4j # noqa
|
||||
from .tools import stix # noqa
|
||||
except ImportError:
|
||||
pass
|
||||
|
|
|
@ -0,0 +1,81 @@
|
|||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import abc
|
||||
import json
|
||||
from json import JSONEncoder
|
||||
import collections
|
||||
import six # Remove that import when discarding python2 support.
|
||||
|
||||
if six.PY2:
|
||||
import warnings
|
||||
warnings.warn("You're using python 2, it is strongly recommended to use python >=3.5")
|
||||
|
||||
|
||||
class MISPEncode(JSONEncoder):
|
||||
|
||||
def default(self, obj):
|
||||
if isinstance(obj, AbstractMISP):
|
||||
return obj.jsonable()
|
||||
return JSONEncoder.default(self, obj)
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta) # Remove that line when discarding python2 support.
|
||||
class AbstractMISP(collections.MutableMapping):
|
||||
|
||||
__not_jsonable = []
|
||||
|
||||
@property
|
||||
def __properties(self):
|
||||
to_return = []
|
||||
for prop, value in vars(self).items():
|
||||
if prop.startswith('_') or prop in self.__not_jsonable:
|
||||
continue
|
||||
to_return.append(prop)
|
||||
return to_return
|
||||
|
||||
def from_dict(self, **kwargs):
|
||||
for prop, value in kwargs.items():
|
||||
if value is None:
|
||||
continue
|
||||
setattr(self, prop, value)
|
||||
|
||||
def update_not_jsonable(self, *args):
|
||||
self.__not_jsonable += args
|
||||
|
||||
def set_not_jsonable(self, *args):
|
||||
self.__not_jsonable = args
|
||||
|
||||
def from_json(self, json_string):
|
||||
"""Load a JSON string"""
|
||||
self.from_dict(json.loads(json_string))
|
||||
|
||||
def to_dict(self):
|
||||
to_return = {}
|
||||
for attribute in self.__properties:
|
||||
val = getattr(self, attribute, None)
|
||||
if val is None:
|
||||
continue
|
||||
to_return[attribute] = val
|
||||
return to_return
|
||||
|
||||
def jsonable(self):
|
||||
return self.to_dict()
|
||||
|
||||
def to_json(self):
|
||||
return json.dumps(self.to_dict(), cls=MISPEncode)
|
||||
|
||||
def __getitem__(self, key):
|
||||
return getattr(self, key)
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
setattr(self, key, value)
|
||||
|
||||
def __delitem__(self, key):
|
||||
delattr(self, key)
|
||||
|
||||
def __iter__(self):
|
||||
return iter(self.to_dict())
|
||||
|
||||
def __len__(self):
|
||||
return len(self.to_dict())
|
|
@ -18,7 +18,7 @@ try:
|
|||
from urllib.parse import urljoin
|
||||
except ImportError:
|
||||
from urlparse import urljoin
|
||||
warnings.warn("You're using python 2, it is strongly recommended to use python >=3.4")
|
||||
warnings.warn("You're using python 2, it is strongly recommended to use python >=3.5")
|
||||
from io import BytesIO, open
|
||||
import zipfile
|
||||
|
||||
|
@ -37,7 +37,8 @@ except ImportError:
|
|||
|
||||
from . import __version__
|
||||
from .exceptions import PyMISPError, SearchError, MissingDependency, NoURL, NoKey
|
||||
from .mispevent import MISPEvent, MISPAttribute, EncodeUpdate
|
||||
from .mispevent import MISPEvent, MISPAttribute
|
||||
from .abstract import MISPEncode
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
@ -318,7 +319,7 @@ class PyMISP(object):
|
|||
session = self.__prepare_session()
|
||||
url = urljoin(self.root_url, 'events')
|
||||
if isinstance(event, MISPEvent):
|
||||
event = json.dumps(event, cls=EncodeUpdate)
|
||||
event = json.dumps(event, cls=MISPEncode)
|
||||
if isinstance(event, basestring):
|
||||
response = session.post(url, data=event)
|
||||
else:
|
||||
|
@ -334,7 +335,7 @@ class PyMISP(object):
|
|||
session = self.__prepare_session()
|
||||
url = urljoin(self.root_url, 'events/{}'.format(event_id))
|
||||
if isinstance(event, MISPEvent):
|
||||
event = json.dumps(event, cls=EncodeUpdate)
|
||||
event = json.dumps(event, cls=MISPEncode)
|
||||
if isinstance(event, basestring):
|
||||
response = session.post(url, data=event)
|
||||
else:
|
||||
|
@ -460,7 +461,7 @@ class PyMISP(object):
|
|||
else:
|
||||
session = self.__prepare_session()
|
||||
url = urljoin(self.root_url, 'attributes/add/{}'.format(eventID_to_update))
|
||||
response = self._check_response(session.post(url, data=json.dumps(a, cls=EncodeUpdate)))
|
||||
response = self._check_response(session.post(url, data=json.dumps(a, cls=MISPEncode)))
|
||||
return response
|
||||
|
||||
def add_named_attribute(self, event, type_value, value, category=None, to_ids=False, comment=None, distribution=None, proposal=False, **kwargs):
|
||||
|
@ -757,7 +758,7 @@ class PyMISP(object):
|
|||
url = urljoin(self.root_url, 'shadow_attributes/{}/{}'.format(path, id))
|
||||
if path in ['add', 'edit']:
|
||||
query = {'request': {'ShadowAttribute': attribute}}
|
||||
response = session.post(url, data=json.dumps(query, cls=EncodeUpdate))
|
||||
response = session.post(url, data=json.dumps(query, cls=MISPEncode))
|
||||
elif path == 'view':
|
||||
response = session.get(url)
|
||||
else: # accept or discard
|
||||
|
@ -1598,6 +1599,35 @@ class PyMISP(object):
|
|||
response = session.post(url)
|
||||
return self._check_response(response)
|
||||
|
||||
# ###################
|
||||
# ### Objects ###
|
||||
# ###################
|
||||
|
||||
def add_object(self, event_id, template_id, misp_object):
|
||||
session = self.__prepare_session()
|
||||
url = urljoin(self.root_url, 'objects/add/{}/{}'.format(event_id, template_id))
|
||||
response = session.post(url, data=misp_object.to_json())
|
||||
return self._check_response(response)
|
||||
|
||||
def add_object_reference(self, misp_object_reference):
|
||||
session = self.__prepare_session()
|
||||
url = urljoin(self.root_url, 'object_references/add')
|
||||
response = session.post(url, data=misp_object_reference.to_json())
|
||||
return self._check_response(response)
|
||||
|
||||
def get_object_templates_list(self):
|
||||
session = self.__prepare_session()
|
||||
url = urljoin(self.root_url, 'objectTemplates')
|
||||
response = session.get(url)
|
||||
return self._check_response(response)['response']
|
||||
|
||||
def get_object_template_id(self, object_uuid):
|
||||
templates = self.get_object_templates_list()
|
||||
for t in templates:
|
||||
if t['ObjectTemplate']['uuid'] == object_uuid:
|
||||
return t['ObjectTemplate']['id']
|
||||
raise Exception('Unable to find template uuid {} on the MISP instance'.format(object_uuid))
|
||||
|
||||
# ###########################
|
||||
# ####### Deprecated ########
|
||||
# ###########################
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
Subproject commit d22ced3b82c4bf3012bf0162831d862685944c9a
|
|
@ -1,6 +1,7 @@
|
|||
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
|
||||
class PyMISPError(Exception):
|
||||
def __init__(self, message):
|
||||
super(PyMISPError, self).__init__(message)
|
||||
|
@ -29,3 +30,16 @@ class NoURL(PyMISPError):
|
|||
|
||||
class NoKey(PyMISPError):
|
||||
pass
|
||||
|
||||
|
||||
class MISPObjectException(PyMISPError):
|
||||
pass
|
||||
|
||||
|
||||
class InvalidMISPObject(MISPObjectException):
|
||||
"""Exception raised when an object doesn't respect the contrains in the definition"""
|
||||
pass
|
||||
|
||||
class UnknownMISPObjectTemplate(MISPObjectException):
|
||||
"""Exception raised when the template is unknown"""
|
||||
pass
|
||||
|
|
|
@ -4,13 +4,24 @@
|
|||
import datetime
|
||||
import time
|
||||
import json
|
||||
from json import JSONEncoder
|
||||
import os
|
||||
import warnings
|
||||
import base64
|
||||
from io import BytesIO
|
||||
from zipfile import ZipFile
|
||||
import hashlib
|
||||
import sys
|
||||
import uuid
|
||||
from collections import Counter
|
||||
|
||||
from .abstract import AbstractMISP
|
||||
from .exceptions import UnknownMISPObjectTemplate, InvalidMISPObject, PyMISPError, NewEventError, NewAttributeError
|
||||
|
||||
|
||||
import six # Remove that import when discarding python2 support.
|
||||
|
||||
if six.PY2:
|
||||
import warnings
|
||||
warnings.warn("You're using python 2, it is strongly recommended to use python >=3.5")
|
||||
|
||||
try:
|
||||
from dateutil.parser import parse
|
||||
|
@ -36,31 +47,35 @@ except ImportError:
|
|||
except ImportError:
|
||||
has_pyme = False
|
||||
|
||||
from .exceptions import PyMISPError, NewEventError, NewAttributeError
|
||||
|
||||
# Least dirty way to support python 2 and 3
|
||||
try:
|
||||
basestring
|
||||
unicode
|
||||
warnings.warn("You're using python 2, it is strongly recommended to use python >=3.4")
|
||||
except NameError:
|
||||
basestring = str
|
||||
unicode = str
|
||||
|
||||
|
||||
class MISPAttribute(object):
|
||||
def _int_to_str(d):
|
||||
# transform all integer back to string
|
||||
for k, v in d.items():
|
||||
if isinstance(v, (int, float)) and not isinstance(v, bool):
|
||||
d[k] = str(v)
|
||||
return d
|
||||
|
||||
|
||||
class MISPAttribute(AbstractMISP):
|
||||
|
||||
def __init__(self, describe_types=None):
|
||||
if not describe_types:
|
||||
self.resources_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'data')
|
||||
with open(os.path.join(self.resources_path, 'describeTypes.json'), 'r') as f:
|
||||
ressources_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'data')
|
||||
with open(os.path.join(ressources_path, 'describeTypes.json'), 'r') as f:
|
||||
t = json.load(f)
|
||||
describe_types = t['result']
|
||||
self.describe_types = describe_types
|
||||
self.categories = describe_types['categories']
|
||||
self.types = describe_types['types']
|
||||
self.category_type_mapping = describe_types['category_type_mappings']
|
||||
self.sane_default = describe_types['sane_defaults']
|
||||
self.__categories = describe_types['categories']
|
||||
self.__types = describe_types['types']
|
||||
self.__category_type_mapping = describe_types['category_type_mappings']
|
||||
self.__sane_default = describe_types['sane_defaults']
|
||||
self._reinitialize_attribute()
|
||||
|
||||
def _reinitialize_attribute(self):
|
||||
|
@ -124,75 +139,64 @@ class MISPAttribute(object):
|
|||
return {self.uuid: False}
|
||||
|
||||
def set_all_values(self, **kwargs):
|
||||
# to be deprecated
|
||||
self.from_dict(**kwargs)
|
||||
|
||||
def from_dict(self, **kwargs):
|
||||
if kwargs.get('type') and kwargs.get('category'):
|
||||
if kwargs['type'] not in self.category_type_mapping[kwargs['category']]:
|
||||
raise NewAttributeError('{} and {} is an invalid combination, type for this category has to be in {}'.format(kwargs.get('type'), kwargs.get('category'), (', '.join(self.category_type_mapping[kwargs['category']]))))
|
||||
if kwargs['type'] not in self.__category_type_mapping[kwargs['category']]:
|
||||
raise NewAttributeError('{} and {} is an invalid combination, type for this category has to be in {}'.format(
|
||||
kwargs.get('type'), kwargs.get('category'), (', '.join(self.__category_type_mapping[kwargs['category']]))))
|
||||
# Required
|
||||
if kwargs.get('type'):
|
||||
self.type = kwargs['type']
|
||||
if self.type not in self.types:
|
||||
raise NewAttributeError('{} is invalid, type has to be in {}'.format(self.type, (', '.join(self.types))))
|
||||
elif not self.type:
|
||||
self.type = kwargs.pop('type', None)
|
||||
if self.type is None:
|
||||
raise NewAttributeError('The type of the attribute is required.')
|
||||
if self.type not in self.__types:
|
||||
raise NewAttributeError('{} is invalid, type has to be in {}'.format(self.type, (', '.join(self.__types))))
|
||||
|
||||
type_defaults = self.sane_default[self.type]
|
||||
|
||||
self.value = kwargs.get('value')
|
||||
type_defaults = self.__sane_default[self.type]
|
||||
|
||||
self.value = kwargs.pop('value', None)
|
||||
if self.value is None:
|
||||
raise NewAttributeError('The value of the attribute is required.')
|
||||
|
||||
# Default values
|
||||
if kwargs.get('category'):
|
||||
self.category = kwargs['category']
|
||||
if self.category not in self.categories:
|
||||
raise NewAttributeError('{} is invalid, category has to be in {}'.format(self.category, (', '.join(self.categories))))
|
||||
else:
|
||||
self.category = type_defaults['default_category']
|
||||
self.category = kwargs.pop('category', type_defaults['default_category'])
|
||||
if self.category not in self.__categories:
|
||||
raise NewAttributeError('{} is invalid, category has to be in {}'.format(self.category, (', '.join(self.__categories))))
|
||||
|
||||
self.to_ids = kwargs.get('to_ids')
|
||||
if self.to_ids is None:
|
||||
self.to_ids = bool(int(type_defaults['to_ids']))
|
||||
self.to_ids = kwargs.pop('to_ids', bool(int(type_defaults['to_ids'])))
|
||||
if not isinstance(self.to_ids, bool):
|
||||
raise NewAttributeError('{} is invalid, to_ids has to be True or False'.format(self.to_ids))
|
||||
|
||||
if kwargs.get('comment'):
|
||||
self.comment = kwargs['comment']
|
||||
if kwargs.get('distribution') is not None:
|
||||
self.distribution = int(kwargs['distribution'])
|
||||
self.distribution = int(kwargs.pop('distribution'))
|
||||
if self.distribution not in [0, 1, 2, 3, 4, 5]:
|
||||
raise NewAttributeError('{} is invalid, the distribution has to be in 0, 1, 2, 3, 4, 5'.format(self.distribution))
|
||||
|
||||
# other possible values
|
||||
if kwargs.get('data'):
|
||||
self.data = kwargs['data']
|
||||
self.data = kwargs.pop('data')
|
||||
self._load_data()
|
||||
if kwargs.get('id'):
|
||||
self.id = int(kwargs['id'])
|
||||
self.id = int(kwargs.pop('id'))
|
||||
if kwargs.get('event_id'):
|
||||
self.event_id = int(kwargs['event_id'])
|
||||
if kwargs.get('uuid'):
|
||||
self.uuid = kwargs['uuid']
|
||||
self.event_id = int(kwargs.pop('event_id'))
|
||||
if kwargs.get('timestamp'):
|
||||
self.timestamp = datetime.datetime(1970, 1, 1) + datetime.timedelta(seconds=int(kwargs['timestamp']))
|
||||
self.timestamp = datetime.datetime(1970, 1, 1) + datetime.timedelta(seconds=int(kwargs.pop('timestamp')))
|
||||
if kwargs.get('sharing_group_id'):
|
||||
self.sharing_group_id = int(kwargs['sharing_group_id'])
|
||||
if kwargs.get('deleted'):
|
||||
self.deleted = kwargs['deleted']
|
||||
if kwargs.get('SharingGroup'):
|
||||
self.SharingGroup = kwargs['SharingGroup']
|
||||
if kwargs.get('ShadowAttribute'):
|
||||
self.ShadowAttribute = kwargs['ShadowAttribute']
|
||||
if kwargs.get('sig'):
|
||||
self.sig = kwargs['sig']
|
||||
self.sharing_group_id = int(kwargs.pop('sharing_group_id'))
|
||||
if kwargs.get('Tag'):
|
||||
self.Tag = [t for t in kwargs['Tag'] if t]
|
||||
self.Tag = [t for t in kwargs.pop('Tag', []) if t]
|
||||
|
||||
# If the user wants to disable correlation, let them. Defaults to False.
|
||||
self.disable_correlation = kwargs.get("disable_correlation", False)
|
||||
self.disable_correlation = kwargs.pop("disable_correlation", False)
|
||||
if self.disable_correlation is None:
|
||||
self.disable_correlation = False
|
||||
|
||||
for k, v in kwargs.items():
|
||||
setattr(self, k, v)
|
||||
|
||||
def _prepare_new_malware_sample(self):
|
||||
if '|' in self.value:
|
||||
# Get the filename, ignore the md5, because humans.
|
||||
|
@ -202,8 +206,7 @@ class MISPAttribute(object):
|
|||
self.malware_filename = self.value
|
||||
m = hashlib.md5()
|
||||
m.update(self.data.getvalue())
|
||||
md5 = m.hexdigest()
|
||||
self.value = '{}|{}'.format(self.malware_filename, md5)
|
||||
self.value = self.malware_filename
|
||||
self.malware_binary = self.data
|
||||
self.encrypt = True
|
||||
|
||||
|
@ -225,88 +228,41 @@ class MISPAttribute(object):
|
|||
self._prepare_new_malware_sample()
|
||||
|
||||
def _json(self):
|
||||
to_return = {'type': self.type, 'category': self.category, 'to_ids': self.to_ids,
|
||||
'distribution': self.distribution, 'value': self.value,
|
||||
'comment': self.comment, 'disable_correlation': self.disable_correlation}
|
||||
if self.uuid:
|
||||
to_return['uuid'] = self.uuid
|
||||
if self.sig:
|
||||
to_return['sig'] = self.sig
|
||||
if self.sharing_group_id:
|
||||
to_return['sharing_group_id'] = self.sharing_group_id
|
||||
if self.Tag:
|
||||
to_return['Tag'] = self.Tag
|
||||
if self.data:
|
||||
to_return['data'] = base64.b64encode(self.data.getvalue()).decode()
|
||||
if self.encrypt:
|
||||
to_return['encrypt'] = self.encrypt
|
||||
to_return = _int_to_str(to_return)
|
||||
return to_return
|
||||
# DEPRECATED
|
||||
return self.to_dict()
|
||||
|
||||
def _json_full(self):
|
||||
to_return = self._json()
|
||||
if self.event_id:
|
||||
to_return['event_id'] = self.event_id
|
||||
if self.id:
|
||||
to_return['id'] = self.id
|
||||
if self.timestamp:
|
||||
# Should never be set on an update, MISP will automatically set it to now
|
||||
# DEPRECATED
|
||||
return self.to_dict()
|
||||
|
||||
def to_dict(self, with_timestamp=False):
|
||||
to_return = super(MISPAttribute, self).to_dict()
|
||||
if to_return.get('data'):
|
||||
to_return['data'] = base64.b64encode(self.data.getvalue()).decode()
|
||||
if with_timestamp and to_return.get('timestamp'):
|
||||
to_return['timestamp'] = int(time.mktime(self.timestamp.timetuple()))
|
||||
if self.deleted is not None:
|
||||
to_return['deleted'] = self.deleted
|
||||
if self.ShadowAttribute:
|
||||
to_return['ShadowAttribute'] = self.ShadowAttribute
|
||||
if self.SharingGroup:
|
||||
to_return['SharingGroup'] = self.SharingGroup
|
||||
else:
|
||||
to_return.pop('timestamp', None)
|
||||
to_return = _int_to_str(to_return)
|
||||
return to_return
|
||||
|
||||
|
||||
class EncodeUpdate(JSONEncoder):
|
||||
def default(self, obj):
|
||||
try:
|
||||
return obj._json()
|
||||
except AttributeError:
|
||||
return JSONEncoder.default(self, obj)
|
||||
|
||||
|
||||
class EncodeFull(JSONEncoder):
|
||||
def default(self, obj):
|
||||
try:
|
||||
return obj._json_full()
|
||||
except AttributeError:
|
||||
return JSONEncoder.default(self, obj)
|
||||
|
||||
|
||||
def _int_to_str(d):
|
||||
# transform all integer back to string
|
||||
for k, v in d.items():
|
||||
if isinstance(v, (int, float)) and not isinstance(v, bool):
|
||||
d[k] = str(v)
|
||||
return d
|
||||
|
||||
|
||||
class MISPEvent(object):
|
||||
class MISPEvent(AbstractMISP):
|
||||
|
||||
def __init__(self, describe_types=None, strict_validation=False):
|
||||
self.resources_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'data')
|
||||
ressources_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'data')
|
||||
if strict_validation:
|
||||
with open(os.path.join(self.resources_path, 'schema.json'), 'r') as f:
|
||||
self.json_schema = json.load(f)
|
||||
with open(os.path.join(ressources_path, 'schema.json'), 'r') as f:
|
||||
self.__json_schema = json.load(f)
|
||||
else:
|
||||
with open(os.path.join(self.resources_path, 'schema-lax.json'), 'r') as f:
|
||||
self.json_schema = json.load(f)
|
||||
with open(os.path.join(ressources_path, 'schema-lax.json'), 'r') as f:
|
||||
self.__json_schema = json.load(f)
|
||||
if not describe_types:
|
||||
with open(os.path.join(self.resources_path, 'describeTypes.json'), 'r') as f:
|
||||
with open(os.path.join(ressources_path, 'describeTypes.json'), 'r') as f:
|
||||
t = json.load(f)
|
||||
describe_types = t['result']
|
||||
self.describe_types = describe_types
|
||||
self.categories = describe_types['categories']
|
||||
self.types = describe_types['types']
|
||||
self.category_type_mapping = describe_types['category_type_mappings']
|
||||
self.sane_default = describe_types['sane_defaults']
|
||||
self.new = True
|
||||
self.dump_full = False
|
||||
|
||||
self.__types = describe_types['types']
|
||||
|
||||
self._reinitialize_event()
|
||||
|
||||
|
@ -339,6 +295,10 @@ class MISPEvent(object):
|
|||
self.RelatedEvent = []
|
||||
self.Tag = []
|
||||
self.Galaxy = None
|
||||
self.Object = None
|
||||
|
||||
def get_known_types(self):
|
||||
return self.__types
|
||||
|
||||
def _serialize(self):
|
||||
return '{date}{threat_level_id}{info}{uuid}{analysis}{timestamp}'.format(
|
||||
|
@ -404,8 +364,6 @@ class MISPEvent(object):
|
|||
self.load(f)
|
||||
|
||||
def load(self, json_event):
|
||||
self.new = False
|
||||
self.dump_full = True
|
||||
if hasattr(json_event, 'read'):
|
||||
# python2 and python3 compatible to find if we have a file
|
||||
json_event = json_event.read()
|
||||
|
@ -420,7 +378,7 @@ class MISPEvent(object):
|
|||
# Invalid event created by MISP up to 2.4.52 (attribute_count is none instead of '0')
|
||||
if event.get('Event') and event.get('Event').get('attribute_count') is None:
|
||||
event['Event']['attribute_count'] = '0'
|
||||
jsonschema.validate(event, self.json_schema)
|
||||
jsonschema.validate(event, self.__json_schema)
|
||||
e = event.get('Event')
|
||||
self._reinitialize_event()
|
||||
self.set_all_values(**e)
|
||||
|
@ -439,140 +397,100 @@ class MISPEvent(object):
|
|||
raise NewEventError('Invalid format for the date: {} - {}'.format(date, type(date)))
|
||||
|
||||
def set_all_values(self, **kwargs):
|
||||
# to be deprecated
|
||||
self.from_dict(**kwargs)
|
||||
|
||||
def from_dict(self, **kwargs):
|
||||
# Required value
|
||||
if kwargs.get('info'):
|
||||
self.info = kwargs['info']
|
||||
elif not self.info:
|
||||
self.info = kwargs.pop('info', None)
|
||||
if not self.info:
|
||||
raise NewAttributeError('The info field of the new event is required.')
|
||||
|
||||
# Default values for a valid event to send to a MISP instance
|
||||
if kwargs.get('distribution') is not None:
|
||||
self.distribution = int(kwargs['distribution'])
|
||||
self.distribution = int(kwargs.pop('distribution'))
|
||||
if self.distribution not in [0, 1, 2, 3, 4]:
|
||||
raise NewEventError('{} is invalid, the distribution has to be in 0, 1, 2, 3, 4'.format(self.distribution))
|
||||
raise NewAttributeError('{} is invalid, the distribution has to be in 0, 1, 2, 3, 4'.format(self.distribution))
|
||||
|
||||
if kwargs.get('threat_level_id') is not None:
|
||||
self.threat_level_id = int(kwargs['threat_level_id'])
|
||||
self.threat_level_id = int(kwargs.pop('threat_level_id'))
|
||||
if self.threat_level_id not in [1, 2, 3, 4]:
|
||||
raise NewEventError('{} is invalid, the threat_level has to be in 1, 2, 3, 4'.format(self.threat_level_id))
|
||||
|
||||
if kwargs.get('analysis') is not None:
|
||||
self.analysis = int(kwargs['analysis'])
|
||||
self.analysis = int(kwargs.pop('analysis'))
|
||||
if self.analysis not in [0, 1, 2]:
|
||||
raise NewEventError('{} is invalid, the analysis has to be in 0, 1, 2'.format(self.analysis))
|
||||
if kwargs.get('published') is not None:
|
||||
self.unpublish()
|
||||
if kwargs.get("published") is True:
|
||||
|
||||
self.published = kwargs.pop('published', None)
|
||||
if self.published is True:
|
||||
self.publish()
|
||||
else:
|
||||
self.unpublish()
|
||||
|
||||
if kwargs.get('date'):
|
||||
self.set_date(kwargs['date'])
|
||||
self.set_date(kwargs.pop('date'))
|
||||
if kwargs.get('Attribute'):
|
||||
for a in kwargs['Attribute']:
|
||||
attribute = MISPAttribute(self.describe_types)
|
||||
for a in kwargs.pop('Attribute'):
|
||||
attribute = MISPAttribute()
|
||||
attribute.set_all_values(**a)
|
||||
self.attributes.append(attribute)
|
||||
|
||||
# All other keys
|
||||
if kwargs.get('id'):
|
||||
self.id = int(kwargs['id'])
|
||||
self.id = int(kwargs.pop('id'))
|
||||
if kwargs.get('orgc_id'):
|
||||
self.orgc_id = int(kwargs['orgc_id'])
|
||||
self.orgc_id = int(kwargs.pop('orgc_id'))
|
||||
if kwargs.get('org_id'):
|
||||
self.org_id = int(kwargs['org_id'])
|
||||
if kwargs.get('uuid'):
|
||||
self.uuid = kwargs['uuid']
|
||||
if kwargs.get('attribute_count'):
|
||||
self.attribute_count = int(kwargs['attribute_count'])
|
||||
self.org_id = int(kwargs.pop('org_id'))
|
||||
if kwargs.get('timestamp'):
|
||||
self.timestamp = datetime.datetime(1970, 1, 1) + datetime.timedelta(seconds=int(kwargs['timestamp']))
|
||||
if kwargs.get('proposal_email_lock'):
|
||||
self.proposal_email_lock = kwargs['proposal_email_lock']
|
||||
if kwargs.get('locked'):
|
||||
self.locked = kwargs['locked']
|
||||
self.timestamp = datetime.datetime(1970, 1, 1) + datetime.timedelta(seconds=int(kwargs.pop('timestamp')))
|
||||
if kwargs.get('publish_timestamp'):
|
||||
self.publish_timestamp = datetime.datetime(1970, 1, 1) + datetime.timedelta(seconds=int(kwargs['publish_timestamp']))
|
||||
self.publish_timestamp = datetime.datetime(1970, 1, 1) + datetime.timedelta(seconds=int(kwargs.pop('publish_timestamp')))
|
||||
if kwargs.get('sharing_group_id'):
|
||||
self.sharing_group_id = int(kwargs['sharing_group_id'])
|
||||
if kwargs.get('Org'):
|
||||
self.Org = kwargs['Org']
|
||||
if kwargs.get('Orgc'):
|
||||
self.Orgc = kwargs['Orgc']
|
||||
if kwargs.get('ShadowAttribute'):
|
||||
self.ShadowAttribute = kwargs['ShadowAttribute']
|
||||
self.sharing_group_id = int(kwargs.pop('sharing_group_id'))
|
||||
if kwargs.get('RelatedEvent'):
|
||||
self.RelatedEvent = []
|
||||
for rel_event in kwargs['RelatedEvent']:
|
||||
for rel_event in kwargs.pop('RelatedEvent'):
|
||||
sub_event = MISPEvent()
|
||||
sub_event.load(rel_event)
|
||||
self.RelatedEvent.append(sub_event)
|
||||
if kwargs.get('Galaxy'):
|
||||
self.Galaxy = kwargs['Galaxy']
|
||||
if kwargs.get('Tag'):
|
||||
self.Tag = [t for t in kwargs['Tag'] if t]
|
||||
if kwargs.get('sig'):
|
||||
self.sig = kwargs['sig']
|
||||
if kwargs.get('global_sig'):
|
||||
self.global_sig = kwargs['global_sig']
|
||||
self.Tag = [t for t in kwargs.pop('Tag', []) if t]
|
||||
if kwargs.get('Object'):
|
||||
self.Object = []
|
||||
for obj in kwargs.pop('Object'):
|
||||
tmp_object = MISPObject(obj['name'])
|
||||
tmp_object.from_dict(**obj)
|
||||
self.Object.append(tmp_object)
|
||||
|
||||
for k, v in kwargs.items():
|
||||
setattr(self, k, v)
|
||||
|
||||
def _json(self):
|
||||
to_return = {'Event': {}}
|
||||
to_return['Event'] = {'distribution': self.distribution, 'info': self.info,
|
||||
'date': self.date.isoformat(), 'published': self.published,
|
||||
'threat_level_id': self.threat_level_id,
|
||||
'analysis': self.analysis, 'Attribute': []}
|
||||
if self.sig:
|
||||
to_return['Event']['sig'] = self.sig
|
||||
if self.global_sig:
|
||||
to_return['Event']['global_sig'] = self.global_sig
|
||||
if self.uuid:
|
||||
to_return['Event']['uuid'] = self.uuid
|
||||
if self.Tag:
|
||||
to_return['Event']['Tag'] = self.Tag
|
||||
if self.Orgc:
|
||||
to_return['Event']['Orgc'] = self.Orgc
|
||||
if self.Galaxy:
|
||||
to_return['Event']['Galaxy'] = self.Galaxy
|
||||
if self.sharing_group_id:
|
||||
to_return['Event']['sharing_group_id'] = self.sharing_group_id
|
||||
to_return['Event'] = _int_to_str(to_return['Event'])
|
||||
if self.attributes:
|
||||
to_return['Event']['Attribute'] = [a._json() for a in self.attributes]
|
||||
jsonschema.validate(to_return, self.json_schema)
|
||||
return to_return
|
||||
# DEPTECATED
|
||||
return self.to_dict()
|
||||
|
||||
def _json_full(self):
|
||||
to_return = self._json()
|
||||
if self.id:
|
||||
to_return['Event']['id'] = self.id
|
||||
if self.orgc_id:
|
||||
to_return['Event']['orgc_id'] = self.orgc_id
|
||||
if self.org_id:
|
||||
to_return['Event']['org_id'] = self.org_id
|
||||
if self.locked is not None:
|
||||
to_return['Event']['locked'] = self.locked
|
||||
if self.attribute_count is not None:
|
||||
to_return['Event']['attribute_count'] = self.attribute_count
|
||||
if self.RelatedEvent:
|
||||
to_return['Event']['RelatedEvent'] = []
|
||||
for rel_event in self.RelatedEvent:
|
||||
to_return['Event']['RelatedEvent'].append(rel_event._json_full())
|
||||
if self.Org:
|
||||
to_return['Event']['Org'] = self.Org
|
||||
if self.sharing_group_id:
|
||||
to_return['Event']['sharing_group_id'] = self.sharing_group_id
|
||||
if self.ShadowAttribute:
|
||||
to_return['Event']['ShadowAttribute'] = self.ShadowAttribute
|
||||
if self.proposal_email_lock is not None:
|
||||
to_return['Event']['proposal_email_lock'] = self.proposal_email_lock
|
||||
if self.locked is not None:
|
||||
to_return['Event']['locked'] = self.locked
|
||||
if self.publish_timestamp:
|
||||
to_return['Event']['publish_timestamp'] = int(time.mktime(self.publish_timestamp.timetuple()))
|
||||
if self.timestamp:
|
||||
# Should never be set on an update, MISP will automatically set it to now
|
||||
to_return['Event']['timestamp'] = int(time.mktime(self.timestamp.timetuple()))
|
||||
to_return['Event'] = _int_to_str(to_return['Event'])
|
||||
if self.attributes:
|
||||
to_return['Event']['Attribute'] = [a._json_full() for a in self.attributes]
|
||||
jsonschema.validate(to_return, self.json_schema)
|
||||
def to_dict(self, with_timestamp=False):
|
||||
to_return = super(MISPEvent, self).to_dict()
|
||||
if to_return.get('date'):
|
||||
to_return['date'] = self.date.isoformat()
|
||||
if to_return.get('attributes'):
|
||||
attributes = to_return.pop('attributes')
|
||||
to_return['Attribute'] = [attribute.to_dict(with_timestamp) for attribute in attributes]
|
||||
if to_return.get('RelatedEvent'):
|
||||
to_return['RelatedEvent'] = [rel_event.to_dict() for rel_event in self.RelatedEvent]
|
||||
if with_timestamp and to_return.get('timestamp'):
|
||||
to_return['timestamp'] = int(time.mktime(self.timestamp.timetuple()))
|
||||
else:
|
||||
to_return.pop('timestamp', None)
|
||||
if with_timestamp and to_return.get('publish_timestamp'):
|
||||
to_return['publish_timestamp'] = int(time.mktime(self.publish_timestamp.timetuple()))
|
||||
else:
|
||||
to_return.pop('publish_timestamp', None)
|
||||
to_return = _int_to_str(to_return)
|
||||
to_return = {'Event': to_return}
|
||||
jsonschema.validate(to_return, self.__json_schema)
|
||||
return to_return
|
||||
|
||||
def add_tag(self, tag):
|
||||
|
@ -581,7 +499,7 @@ class MISPEvent(object):
|
|||
def add_attribute_tag(self, tag, attribute_identifier):
|
||||
attribute = None
|
||||
for a in self.attributes:
|
||||
if (a.id == attribute_identifier or a.uuid == attribute_identifier or
|
||||
if (a.id == attribute_identifier or a.uuid == attribute_identifier or
|
||||
attribute_identifier == a.value or attribute_identifier in a.value.split('|')):
|
||||
a.add_tag(tag)
|
||||
attribute = a
|
||||
|
@ -606,10 +524,160 @@ class MISPEvent(object):
|
|||
raise Exception('No attribute with UUID/ID {} found.'.format(attribute_id))
|
||||
|
||||
def add_attribute(self, type, value, **kwargs):
|
||||
attribute = MISPAttribute(self.describe_types)
|
||||
attribute = MISPAttribute()
|
||||
if isinstance(value, list):
|
||||
for a in value:
|
||||
self.add_attribute(type, a, **kwargs)
|
||||
else:
|
||||
attribute.set_all_values(type=type, value=value, **kwargs)
|
||||
self.attributes.append(attribute)
|
||||
|
||||
|
||||
class MISPObjectReference(AbstractMISP):
|
||||
|
||||
def __init__(self):
|
||||
super(MISPObjectReference, self).__init__()
|
||||
|
||||
def from_dict(self, object_uuid, referenced_uuid, relationship_type, comment=None, **kwargs):
|
||||
self.object_uuid = object_uuid
|
||||
self.referenced_uuid = referenced_uuid
|
||||
self.relationship_type = relationship_type
|
||||
self.comment = comment
|
||||
for k, v in kwargs:
|
||||
setattr(self, k, v)
|
||||
|
||||
|
||||
class MISPObjectAttribute(MISPAttribute):
|
||||
|
||||
def __init__(self, definition):
|
||||
super(MISPAttribute, self).__init__()
|
||||
self.__definition = definition
|
||||
|
||||
def from_dict(self, object_relation, value, **kwargs):
|
||||
self.object_relation = object_relation
|
||||
self.value = value
|
||||
# Initialize the new MISPAttribute
|
||||
# Get the misp attribute type from the definition
|
||||
self.type = kwargs.pop('type', None)
|
||||
if self.type is None:
|
||||
self.type = self.__definition.get('misp-attribute')
|
||||
self.disable_correlation = kwargs.pop('disable_correlation', None)
|
||||
if self.disable_correlation is None:
|
||||
# The correlation can be disabled by default in the object definition.
|
||||
# Use this value if it isn't overloaded by the object
|
||||
self.disable_correlation = self.__definition.get('disable_correlation')
|
||||
self.to_ids = kwargs.pop('to_ids', None)
|
||||
if self.to_ids is None:
|
||||
# Same for the to_ids flag
|
||||
self.to_ids = self.__definition.get('to_ids')
|
||||
kwargs.update(**self)
|
||||
super(MISPAttribute, self).from_dict(**kwargs)
|
||||
|
||||
|
||||
class MISPObject(AbstractMISP):
|
||||
|
||||
def __init__(self, name, strict=True):
|
||||
super(MISPObject, self).__init__()
|
||||
self.__strict = strict
|
||||
self.name = name
|
||||
self.__misp_objects_path = os.path.join(
|
||||
os.path.abspath(os.path.dirname(sys.modules['pymisp'].__file__)),
|
||||
'data', 'misp-objects', 'objects')
|
||||
if os.path.exists(os.path.join(self.__misp_objects_path, self.name, 'definition.json')):
|
||||
self.__known_template = True
|
||||
else:
|
||||
if self.__strict:
|
||||
raise UnknownMISPObjectTemplate('{} is unknown in the MISP object directory.')
|
||||
else:
|
||||
self.__known_template = False
|
||||
if self.__known_template:
|
||||
with open(os.path.join(self.__misp_objects_path, self.name, 'definition.json'), 'r') as f:
|
||||
self.__definition = json.load(f)
|
||||
setattr(self, 'meta-category', self.__definition['meta-category'])
|
||||
self.template_uuid = self.__definition['uuid']
|
||||
self.description = self.__definition['description']
|
||||
self.template_version = self.__definition['version']
|
||||
else:
|
||||
# FIXME We need to set something for meta-category, template_uuid, description and template_version
|
||||
pass
|
||||
self.uuid = str(uuid.uuid4())
|
||||
self.Attribute = []
|
||||
self.ObjectReference = []
|
||||
|
||||
def from_dict(self, **kwargs):
|
||||
if self.__known_template:
|
||||
if kwargs.get('template_uuid') and kwargs['template_uuid'] != self.template_uuid:
|
||||
if self.__strict:
|
||||
raise UnknownMISPObjectTemplate('UUID of the object is different from the one of the template.')
|
||||
else:
|
||||
self.__known_template = False
|
||||
if kwargs.get('template_version') and int(kwargs['template_version']) != self.template_version:
|
||||
if self.strict:
|
||||
raise UnknownMISPObjectTemplate('Version of the object ({}) is different from the one of the template ({}).'.format(kwargs['template_version'], self.template_version))
|
||||
else:
|
||||
self.__known_template = False
|
||||
|
||||
for key, value in kwargs.items():
|
||||
if key == 'Attribute':
|
||||
for v in value:
|
||||
self.add_attribute(**v)
|
||||
elif key == 'ObjectReference':
|
||||
for v in value:
|
||||
self.add_reference(**v)
|
||||
else:
|
||||
setattr(self, key, value)
|
||||
|
||||
def to_dict(self, strict=True):
|
||||
if strict or self.__strict and self.__known_template:
|
||||
self._validate()
|
||||
return super(MISPObject, self).to_dict()
|
||||
|
||||
def to_json(self, strict=True):
|
||||
if strict or self.__strict and self.__known_template:
|
||||
self._validate()
|
||||
return super(MISPObject, self).to_json()
|
||||
|
||||
def _validate(self):
|
||||
"""Make sure the object we're creating has the required fields"""
|
||||
all_object_relations = []
|
||||
for a in self.Attribute:
|
||||
all_object_relations.append(a.object_relation)
|
||||
count_relations = dict(Counter(all_object_relations))
|
||||
for key, counter in count_relations.items():
|
||||
if counter == 1:
|
||||
continue
|
||||
if not self.__definition['attributes'][key].get('multiple'):
|
||||
raise InvalidMISPObject('Multiple occurrences of {} is not allowed'.format(key))
|
||||
all_attribute_names = set(count_relations.keys())
|
||||
if self.__definition.get('requiredOneOf'):
|
||||
if not set(self.__definition['requiredOneOf']) & all_attribute_names:
|
||||
raise InvalidMISPObject('At least one of the following attributes is required: {}'.format(', '.join(self.__definition['requiredOneOf'])))
|
||||
if self.__definition.get('required'):
|
||||
for r in self.__definition.get('required'):
|
||||
if r not in all_attribute_names:
|
||||
raise InvalidMISPObject('{} is required'.format(r))
|
||||
return True
|
||||
|
||||
def add_reference(self, referenced_uuid, relationship_type, comment=None, **kwargs):
|
||||
"""Add a link (uuid) to an other object"""
|
||||
if kwargs.get('object_uuid'):
|
||||
# Load existing object
|
||||
object_uuid = kwargs.get('object_uuid')
|
||||
else:
|
||||
# New reference
|
||||
object_uuid = self.uuid
|
||||
reference = MISPObjectReference()
|
||||
reference.from_dict(object_uuid=object_uuid, referenced_uuid=referenced_uuid,
|
||||
relationship_type=relationship_type, comment=comment, **kwargs)
|
||||
self.ObjectReference.append(reference)
|
||||
|
||||
def add_attribute(self, object_relation, **value):
|
||||
if value.get('value') is None:
|
||||
return None
|
||||
if self.__known_template:
|
||||
attribute = MISPObjectAttribute(self.__definition['attributes'][object_relation])
|
||||
else:
|
||||
attribute = MISPObjectAttribute({})
|
||||
attribute.from_dict(object_relation, **value)
|
||||
self.Attribute.append(attribute)
|
||||
return attribute
|
||||
|
|
|
@ -0,0 +1,7 @@
|
|||
from .neo4j import Neo4j # noqa
|
||||
from .fileobject import FileObject # noqa
|
||||
from .peobject import PEObject, PESectionObject # noqa
|
||||
from .elfobject import ELFObject, ELFSectionObject # noqa
|
||||
from .machoobject import MachOObject, MachOSectionObject # noqa
|
||||
from .create_misp_object import make_binary_objects # noqa
|
||||
from .abstractgenerator import AbstractMISPObjectGenerator # noqa
|
|
@ -0,0 +1,16 @@
|
|||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import abc
|
||||
import six
|
||||
from .. import MISPObject
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta) # Remove that line when discarding python2 support.
|
||||
# Python3 way: class MISPObjectGenerator(metaclass=abc.ABCMeta):
|
||||
class AbstractMISPObjectGenerator(MISPObject):
|
||||
|
||||
@abc.abstractmethod
|
||||
def generate_attributes(self):
|
||||
"""Contains the logic where all the values of the object are gathered"""
|
||||
pass
|
|
@ -0,0 +1,69 @@
|
|||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from . import FileObject, PEObject, ELFObject, MachOObject
|
||||
from ..exceptions import MISPObjectException
|
||||
import warnings
|
||||
|
||||
try:
|
||||
import lief
|
||||
from lief import Logger
|
||||
Logger.disable()
|
||||
HAS_LIEF = True
|
||||
except ImportError:
|
||||
HAS_LIEF = False
|
||||
|
||||
|
||||
class FileTypeNotImplemented(MISPObjectException):
|
||||
pass
|
||||
|
||||
|
||||
def make_pe_objects(lief_parsed, misp_file):
|
||||
pe_object = PEObject(parsed=lief_parsed)
|
||||
misp_file.add_reference(pe_object.uuid, 'included-in', 'PE indicators')
|
||||
pe_sections = []
|
||||
for s in pe_object.sections:
|
||||
pe_sections.append(s)
|
||||
return misp_file, pe_object, pe_sections
|
||||
|
||||
|
||||
def make_elf_objects(lief_parsed, misp_file):
|
||||
elf_object = ELFObject(parsed=lief_parsed)
|
||||
misp_file.add_reference(elf_object.uuid, 'included-in', 'ELF indicators')
|
||||
elf_sections = []
|
||||
for s in elf_object.sections:
|
||||
elf_sections.append(s)
|
||||
return misp_file, elf_object, elf_sections
|
||||
|
||||
|
||||
def make_macho_objects(lief_parsed, misp_file):
|
||||
macho_object = MachOObject(parsed=lief_parsed)
|
||||
misp_file.add_reference(macho_object.uuid, 'included-in', 'MachO indicators')
|
||||
macho_sections = []
|
||||
for s in macho_object.sections:
|
||||
macho_sections.append(s)
|
||||
return misp_file, macho_object, macho_sections
|
||||
|
||||
|
||||
def make_binary_objects(filepath):
|
||||
misp_file = FileObject(filepath)
|
||||
if HAS_LIEF:
|
||||
try:
|
||||
lief_parsed = lief.parse(filepath)
|
||||
if isinstance(lief_parsed, lief.PE.Binary):
|
||||
return make_pe_objects(lief_parsed, misp_file)
|
||||
elif isinstance(lief_parsed, lief.ELF.Binary):
|
||||
return make_elf_objects(lief_parsed, misp_file)
|
||||
elif isinstance(lief_parsed, lief.MachO.Binary):
|
||||
return make_macho_objects(lief_parsed, misp_file)
|
||||
except lief.bad_format as e:
|
||||
warnings.warn('\tBad format: ', e)
|
||||
except lief.bad_file as e:
|
||||
warnings.warn('\tBad file: ', e)
|
||||
except lief.parser_error as e:
|
||||
warnings.warn('\tParser error: ', e)
|
||||
except FileTypeNotImplemented as e: # noqa
|
||||
warnings.warn(e)
|
||||
else:
|
||||
warnings.warn('Please install lief, documentation here: https://github.com/lief-project/LIEF')
|
||||
return misp_file, None, None
|
|
@ -0,0 +1,93 @@
|
|||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from .abstractgenerator import AbstractMISPObjectGenerator
|
||||
from io import BytesIO
|
||||
from hashlib import md5, sha1, sha256, sha512
|
||||
import warnings
|
||||
|
||||
|
||||
try:
|
||||
import lief
|
||||
HAS_LIEF = True
|
||||
except ImportError:
|
||||
HAS_LIEF = False
|
||||
|
||||
try:
|
||||
import pydeep
|
||||
HAS_PYDEEP = True
|
||||
except ImportError:
|
||||
HAS_PYDEEP = False
|
||||
|
||||
|
||||
class ELFObject(AbstractMISPObjectGenerator):
|
||||
|
||||
def __init__(self, parsed=None, filepath=None, pseudofile=None):
|
||||
if not HAS_PYDEEP:
|
||||
warnings.warn("Please install pydeep: pip install git+https://github.com/kbandla/pydeep.git")
|
||||
if not HAS_LIEF:
|
||||
raise ImportError('Please install lief, documentation here: https://github.com/lief-project/LIEF')
|
||||
if pseudofile:
|
||||
if isinstance(pseudofile, BytesIO):
|
||||
self.__elf = lief.ELF.parse(raw=pseudofile.getvalue())
|
||||
elif isinstance(pseudofile, bytes):
|
||||
self.__elf = lief.ELF.parse(raw=pseudofile)
|
||||
else:
|
||||
raise Exception('Pseudo file can be BytesIO or bytes got {}'.format(type(pseudofile)))
|
||||
elif filepath:
|
||||
self.__elf = lief.ELF.parse(filepath)
|
||||
elif parsed:
|
||||
# Got an already parsed blob
|
||||
if isinstance(parsed, lief.ELF.Binary):
|
||||
self.__elf = parsed
|
||||
else:
|
||||
raise Exception('Not a lief.ELF.Binary: {}'.format(type(parsed)))
|
||||
super(ELFObject, self).__init__('elf')
|
||||
self.generate_attributes()
|
||||
# Mark as non_jsonable because we need to add them manually
|
||||
self.update_not_jsonable('ObjectReference')
|
||||
|
||||
def generate_attributes(self):
|
||||
# General information
|
||||
self.add_attribute('type', value=str(self.__elf.header.file_type).split('.')[1])
|
||||
self.add_attribute('entrypoint-address', value=self.__elf.entrypoint)
|
||||
self.add_attribute('arch', value=str(self.__elf.header.machine_type).split('.')[1])
|
||||
self.add_attribute('os_abi', value=str(self.__elf.header.identity_os_abi).split('.')[1])
|
||||
# Sections
|
||||
self.sections = []
|
||||
if self.__elf.sections:
|
||||
pos = 0
|
||||
for section in self.__elf.sections:
|
||||
s = ELFSectionObject(section)
|
||||
self.add_reference(s.uuid, 'included-in', 'Section {} of ELF'.format(pos))
|
||||
pos += 1
|
||||
self.sections.append(s)
|
||||
self.add_attribute('number-sections', value=len(self.sections))
|
||||
|
||||
|
||||
class ELFSectionObject(AbstractMISPObjectGenerator):
|
||||
|
||||
def __init__(self, section):
|
||||
# Python3 way
|
||||
# super().__init__('pe-section')
|
||||
super(ELFSectionObject, self).__init__('elf-section')
|
||||
self.__section = section
|
||||
self.__data = bytes(self.__section.content)
|
||||
self.generate_attributes()
|
||||
# Mark as non_jsonable because we need to add them manually
|
||||
self.update_not_jsonable('ObjectReference')
|
||||
|
||||
def generate_attributes(self):
|
||||
self.add_attribute('name', value=self.__section.name)
|
||||
self.add_attribute('type', value=str(self.__section.type).split('.')[1])
|
||||
for flag in self.__section.flags_list:
|
||||
self.add_attribute('flag', value=str(flag).split('.')[1])
|
||||
size = self.add_attribute('size-in-bytes', value=self.__section.size)
|
||||
if int(size.value) > 0:
|
||||
self.add_attribute('entropy', value=self.__section.entropy)
|
||||
self.add_attribute('md5', value=md5(self.__data).hexdigest())
|
||||
self.add_attribute('sha1', value=sha1(self.__data).hexdigest())
|
||||
self.add_attribute('sha256', value=sha256(self.__data).hexdigest())
|
||||
self.add_attribute('sha512', value=sha512(self.__data).hexdigest())
|
||||
if HAS_PYDEEP:
|
||||
self.add_attribute('ssdeep', value=pydeep.hash_buf(self.__data).decode())
|
|
@ -0,0 +1,81 @@
|
|||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from .abstractgenerator import AbstractMISPObjectGenerator
|
||||
import os
|
||||
from io import BytesIO
|
||||
from hashlib import md5, sha1, sha256, sha512
|
||||
import math
|
||||
from collections import Counter
|
||||
import warnings
|
||||
|
||||
try:
|
||||
import pydeep
|
||||
HAS_PYDEEP = True
|
||||
except ImportError:
|
||||
HAS_PYDEEP = False
|
||||
|
||||
try:
|
||||
import magic
|
||||
HAS_MAGIC = True
|
||||
except ImportError:
|
||||
HAS_MAGIC = False
|
||||
|
||||
|
||||
class FileObject(AbstractMISPObjectGenerator):
|
||||
|
||||
def __init__(self, filepath=None, pseudofile=None, filename=None):
|
||||
if not HAS_PYDEEP:
|
||||
warnings.warn("Please install pydeep: pip install git+https://github.com/kbandla/pydeep.git")
|
||||
if not HAS_MAGIC:
|
||||
warnings.warn("Please install python-magic: pip install python-magic.")
|
||||
if filepath:
|
||||
self.filepath = filepath
|
||||
self.filename = os.path.basename(self.filepath)
|
||||
with open(filepath, 'rb') as f:
|
||||
self.__pseudofile = BytesIO(f.read())
|
||||
elif pseudofile and isinstance(pseudofile, BytesIO):
|
||||
# WARNING: lief.parse requires a path
|
||||
self.filepath = None
|
||||
self.__pseudofile = pseudofile
|
||||
self.filename = filename
|
||||
else:
|
||||
raise Exception('File buffer (BytesIO) or a path is required.')
|
||||
# PY3 way:
|
||||
# super().__init__('file')
|
||||
super(FileObject, self).__init__('file')
|
||||
self.__data = self.__pseudofile.getvalue()
|
||||
self.generate_attributes()
|
||||
# Mark as non_jsonable because we need to add them manually
|
||||
self.update_not_jsonable('ObjectReference')
|
||||
|
||||
def generate_attributes(self):
|
||||
self.add_attribute('filename', value=self.filename)
|
||||
size = self.add_attribute('size-in-bytes', value=len(self.__data))
|
||||
if int(size.value) > 0:
|
||||
self.add_attribute('entropy', value=self.__entropy_H(self.__data))
|
||||
self.add_attribute('md5', value=md5(self.__data).hexdigest())
|
||||
self.add_attribute('sha1', value=sha1(self.__data).hexdigest())
|
||||
self.add_attribute('sha256', value=sha256(self.__data).hexdigest())
|
||||
self.add_attribute('sha512', value=sha512(self.__data).hexdigest())
|
||||
self.add_attribute('malware-sample', value=self.filename, data=self.__pseudofile)
|
||||
if HAS_MAGIC:
|
||||
self.add_attribute('mimetype', value=magic.from_buffer(self.__data))
|
||||
if HAS_PYDEEP:
|
||||
self.add_attribute('ssdeep', value=pydeep.hash_buf(self.__data).decode())
|
||||
|
||||
def __entropy_H(self, data):
|
||||
"""Calculate the entropy of a chunk of data."""
|
||||
# NOTE: copy of the entropy function from pefile
|
||||
|
||||
if len(data) == 0:
|
||||
return 0.0
|
||||
|
||||
occurences = Counter(bytearray(data))
|
||||
|
||||
entropy = 0
|
||||
for x in occurences.values():
|
||||
p_x = float(x) / len(data)
|
||||
entropy -= p_x * math.log(p_x, 2)
|
||||
|
||||
return entropy
|
|
@ -0,0 +1,92 @@
|
|||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from .abstractgenerator import AbstractMISPObjectGenerator
|
||||
from io import BytesIO
|
||||
from hashlib import md5, sha1, sha256, sha512
|
||||
import warnings
|
||||
|
||||
|
||||
try:
|
||||
import lief
|
||||
HAS_LIEF = True
|
||||
except ImportError:
|
||||
HAS_LIEF = False
|
||||
|
||||
try:
|
||||
import pydeep
|
||||
HAS_PYDEEP = True
|
||||
except ImportError:
|
||||
HAS_PYDEEP = False
|
||||
|
||||
|
||||
class MachOObject(AbstractMISPObjectGenerator):
|
||||
|
||||
def __init__(self, parsed=None, filepath=None, pseudofile=None):
|
||||
if not HAS_PYDEEP:
|
||||
warnings.warn("Please install pydeep: pip install git+https://github.com/kbandla/pydeep.git")
|
||||
if not HAS_LIEF:
|
||||
raise ImportError('Please install lief, documentation here: https://github.com/lief-project/LIEF')
|
||||
if pseudofile:
|
||||
if isinstance(pseudofile, BytesIO):
|
||||
self.__macho = lief.MachO.parse(raw=pseudofile.getvalue())
|
||||
elif isinstance(pseudofile, bytes):
|
||||
self.__macho = lief.MachO.parse(raw=pseudofile)
|
||||
else:
|
||||
raise Exception('Pseudo file can be BytesIO or bytes got {}'.format(type(pseudofile)))
|
||||
elif filepath:
|
||||
self.__macho = lief.MachO.parse(filepath)
|
||||
elif parsed:
|
||||
# Got an already parsed blob
|
||||
if isinstance(parsed, lief.MachO.Binary):
|
||||
self.__macho = parsed
|
||||
else:
|
||||
raise Exception('Not a lief.MachO.Binary: {}'.format(type(parsed)))
|
||||
# Python3 way
|
||||
# super().__init__('elf')
|
||||
super(MachOObject, self).__init__('macho')
|
||||
self.generate_attributes()
|
||||
# Mark as non_jsonable because we need to add them manually
|
||||
self.update_not_jsonable(['ObjectReference'])
|
||||
|
||||
def generate_attributes(self):
|
||||
self.add_attribute('type', value=str(self.__macho.header.file_type).split('.')[1])
|
||||
self.add_attribute('name', value=self.__macho.name)
|
||||
# General information
|
||||
if self.__macho.has_entrypoint:
|
||||
self.add_attribute('entrypoint-address', value=self.__macho.entrypoint)
|
||||
# Sections
|
||||
self.sections = []
|
||||
if self.__macho.sections:
|
||||
pos = 0
|
||||
for section in self.__macho.sections:
|
||||
s = MachOSectionObject(section)
|
||||
self.add_reference(s.uuid, 'included-in', 'Section {} of MachO'.format(pos))
|
||||
pos += 1
|
||||
self.sections.append(s)
|
||||
self.add_attribute('number-sections', value=len(self.sections))
|
||||
|
||||
|
||||
class MachOSectionObject(AbstractMISPObjectGenerator):
|
||||
|
||||
def __init__(self, section):
|
||||
# Python3 way
|
||||
# super().__init__('pe-section')
|
||||
super(MachOSectionObject, self).__init__('macho-section')
|
||||
self.__section = section
|
||||
self.__data = bytes(self.__section.content)
|
||||
self.generate_attributes()
|
||||
# Mark as non_jsonable because we need to add them manually
|
||||
self.update_not_jsonable(['ObjectReference'])
|
||||
|
||||
def generate_attributes(self):
|
||||
self.add_attribute('name', value=self.__section.name)
|
||||
size = self.add_attribute('size-in-bytes', value=self.__section.size)
|
||||
if int(size.value) > 0:
|
||||
self.add_attribute('entropy', value=self.__section.entropy)
|
||||
self.add_attribute('md5', value=md5(self.__data).hexdigest())
|
||||
self.add_attribute('sha1', value=sha1(self.__data).hexdigest())
|
||||
self.add_attribute('sha256', value=sha256(self.__data).hexdigest())
|
||||
self.add_attribute('sha512', value=sha512(self.__data).hexdigest())
|
||||
if HAS_PYDEEP:
|
||||
self.add_attribute('ssdeep', value=pydeep.hash_buf(self.__data).decode())
|
|
@ -2,7 +2,7 @@
|
|||
|
||||
import glob
|
||||
import os
|
||||
from pymisp import MISPEvent
|
||||
from .. import MISPEvent
|
||||
|
||||
try:
|
||||
from py2neo import authenticate, Graph, Node, Relationship
|
||||
|
@ -53,5 +53,5 @@ class Neo4j():
|
|||
av = Relationship(attr_node, "is", val)
|
||||
s = val | ev | av
|
||||
tx.merge(s)
|
||||
#tx.graph.push(s)
|
||||
# tx.graph.push(s)
|
||||
tx.commit()
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
|
||||
import os
|
||||
|
||||
from pymisp import MISPEvent
|
||||
from .. import MISPEvent
|
||||
try:
|
||||
from bs4 import BeautifulSoup
|
||||
has_bs4 = True
|
||||
|
|
|
@ -0,0 +1,140 @@
|
|||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from .abstractgenerator import AbstractMISPObjectGenerator
|
||||
from io import BytesIO
|
||||
from hashlib import md5, sha1, sha256, sha512
|
||||
from datetime import datetime
|
||||
import warnings
|
||||
|
||||
|
||||
try:
|
||||
import lief
|
||||
HAS_LIEF = True
|
||||
except ImportError:
|
||||
HAS_LIEF = False
|
||||
|
||||
try:
|
||||
import pydeep
|
||||
HAS_PYDEEP = True
|
||||
except ImportError:
|
||||
HAS_PYDEEP = False
|
||||
|
||||
|
||||
class PEObject(AbstractMISPObjectGenerator):
|
||||
|
||||
def __init__(self, parsed=None, filepath=None, pseudofile=None):
|
||||
if not HAS_PYDEEP:
|
||||
warnings.warn("Please install pydeep: pip install git+https://github.com/kbandla/pydeep.git")
|
||||
if not HAS_LIEF:
|
||||
raise ImportError('Please install lief, documentation here: https://github.com/lief-project/LIEF')
|
||||
if pseudofile:
|
||||
if isinstance(pseudofile, BytesIO):
|
||||
self.__pe = lief.PE.parse(raw=pseudofile.getvalue())
|
||||
elif isinstance(pseudofile, bytes):
|
||||
self.__pe = lief.PE.parse(raw=pseudofile)
|
||||
else:
|
||||
raise Exception('Pseudo file can be BytesIO or bytes got {}'.format(type(pseudofile)))
|
||||
elif filepath:
|
||||
self.__pe = lief.PE.parse(filepath)
|
||||
elif parsed:
|
||||
# Got an already parsed blob
|
||||
if isinstance(parsed, lief.PE.Binary):
|
||||
self.__pe = parsed
|
||||
else:
|
||||
raise Exception('Not a lief.PE.Binary: {}'.format(type(parsed)))
|
||||
# Python3 way
|
||||
# super().__init__('pe')
|
||||
super(PEObject, self).__init__('pe')
|
||||
self.generate_attributes()
|
||||
# Mark as non_jsonable because we need to add them manually
|
||||
self.update_not_jsonable('ObjectReference')
|
||||
|
||||
def _is_exe(self):
|
||||
if not self._is_dll() and not self._is_driver():
|
||||
return self.__pe.header.has_characteristic(lief.PE.HEADER_CHARACTERISTICS.EXECUTABLE_IMAGE)
|
||||
return False
|
||||
|
||||
def _is_dll(self):
|
||||
return self.__pe.header.has_characteristic(lief.PE.HEADER_CHARACTERISTICS.DLL)
|
||||
|
||||
def _is_driver(self):
|
||||
# List from pefile
|
||||
system_DLLs = set(('ntoskrnl.exe', 'hal.dll', 'ndis.sys', 'bootvid.dll', 'kdcom.dll'))
|
||||
if system_DLLs.intersection([imp.lower() for imp in self.__pe.libraries]):
|
||||
return True
|
||||
return False
|
||||
|
||||
def _get_pe_type(self):
|
||||
if self._is_dll():
|
||||
return 'dll'
|
||||
elif self._is_driver():
|
||||
return 'driver'
|
||||
elif self._is_exe():
|
||||
return 'exe'
|
||||
else:
|
||||
return 'unknown'
|
||||
|
||||
def generate_attributes(self):
|
||||
self.add_attribute('type', value=self._get_pe_type())
|
||||
# General information
|
||||
self.add_attribute('entrypoint-address', value=self.__pe.entrypoint)
|
||||
self.add_attribute('compilation-timestamp', value=datetime.utcfromtimestamp(self.__pe.header.time_date_stamps).isoformat())
|
||||
# self.imphash = self.__pe.get_imphash()
|
||||
try:
|
||||
if (self.__pe.has_resources and
|
||||
self.__pe.resources_manager.has_version and
|
||||
self.__pe.resources_manager.version.has_string_file_info and
|
||||
self.__pe.resources_manager.version.string_file_info.langcode_items):
|
||||
fileinfo = dict(self.__pe.resources_manager.version.string_file_info.langcode_items[0].items.items())
|
||||
self.add_attribute('original-filename', value=fileinfo.get('OriginalFilename'))
|
||||
self.add_attribute('internal-filename', value=fileinfo.get('InternalName'))
|
||||
self.add_attribute('file-description', value=fileinfo.get('FileDescription'))
|
||||
self.add_attribute('file-version', value=fileinfo.get('FileVersion'))
|
||||
self.add_attribute('lang-id', value=self.__pe.resources_manager.version.string_file_info.langcode_items[0].key)
|
||||
self.add_attribute('product-name', value=fileinfo.get('ProductName'))
|
||||
self.add_attribute('product-version', value=fileinfo.get('ProductVersion'))
|
||||
self.add_attribute('company-name', value=fileinfo.get('CompanyName'))
|
||||
self.add_attribute('legal-copyright', value=fileinfo.get('LegalCopyright'))
|
||||
except lief.read_out_of_bound:
|
||||
# The file is corrupted
|
||||
pass
|
||||
# Sections
|
||||
self.sections = []
|
||||
if self.__pe.sections:
|
||||
pos = 0
|
||||
for section in self.__pe.sections:
|
||||
s = PESectionObject(section)
|
||||
self.add_reference(s.uuid, 'included-in', 'Section {} of PE'.format(pos))
|
||||
if ((self.__pe.entrypoint >= section.virtual_address) and
|
||||
(self.__pe.entrypoint < (section.virtual_address + section.virtual_size))):
|
||||
self.add_attribute('entrypoint-section-at-position', value='{}|{}'.format(section.name, pos))
|
||||
pos += 1
|
||||
self.sections.append(s)
|
||||
self.add_attribute('number-sections', value=len(self.sections))
|
||||
# TODO: TLSSection / DIRECTORY_ENTRY_TLS
|
||||
|
||||
|
||||
class PESectionObject(AbstractMISPObjectGenerator):
|
||||
|
||||
def __init__(self, section):
|
||||
# Python3 way
|
||||
# super().__init__('pe-section')
|
||||
super(PESectionObject, self).__init__('pe-section')
|
||||
self.__section = section
|
||||
self.__data = bytes(self.__section.content)
|
||||
self.generate_attributes()
|
||||
# Mark as non_jsonable because we need to add them manually
|
||||
self.update_not_jsonable('ObjectReference')
|
||||
|
||||
def generate_attributes(self):
|
||||
self.add_attribute('name', value=self.__section.name)
|
||||
size = self.add_attribute('size-in-bytes', value=self.__section.size)
|
||||
if int(size.value) > 0:
|
||||
self.add_attribute('entropy', value=self.__section.entropy)
|
||||
self.add_attribute('md5', value=md5(self.__data).hexdigest())
|
||||
self.add_attribute('sha1', value=sha1(self.__data).hexdigest())
|
||||
self.add_attribute('sha256', value=sha256(self.__data).hexdigest())
|
||||
self.add_attribute('sha512', value=sha512(self.__data).hexdigest())
|
||||
if HAS_PYDEEP:
|
||||
self.add_attribute('ssdeep', value=pydeep.hash_buf(self.__data).decode())
|
9
setup.py
9
setup.py
|
@ -1,4 +1,4 @@
|
|||
#!/usr/bin/python
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
from setuptools import setup
|
||||
import pymisp
|
||||
|
@ -27,7 +27,10 @@ setup(
|
|||
'Topic :: Internet',
|
||||
],
|
||||
test_suite="tests",
|
||||
install_requires=['requests', 'python-dateutil', 'jsonschema'],
|
||||
install_requires=['six', 'requests', 'python-dateutil', 'jsonschema'],
|
||||
include_package_data=True,
|
||||
package_data={'data': ['schema.json', 'schema-lax.json', 'describeTypes.json']},
|
||||
package_data={'pymisp': ['data/*.json', 'data/misp-objects/schema_objects.json',
|
||||
'data/misp-objects/schema_relationships.json',
|
||||
'data/misp-objects/objects/*/definition.json',
|
||||
'data/misp-objects/relationships/definition.json']},
|
||||
)
|
||||
|
|
|
@ -10,8 +10,9 @@ import pymisp as pm
|
|||
from pymisp import PyMISP
|
||||
# from pymisp import NewEventError
|
||||
from pymisp import MISPEvent
|
||||
from pymisp import EncodeUpdate
|
||||
from pymisp import EncodeFull
|
||||
from pymisp import MISPEncode
|
||||
|
||||
from pymisp.tools import make_binary_objects
|
||||
|
||||
|
||||
@requests_mock.Mocker()
|
||||
|
@ -122,8 +123,7 @@ class TestOffline(unittest.TestCase):
|
|||
misp_event = MISPEvent(pymisp.describe_types)
|
||||
with open('tests/57c4445b-c548-4654-af0b-4be3950d210f.json', 'r') as f:
|
||||
misp_event.load(f.read())
|
||||
json.dumps(misp_event, cls=EncodeUpdate)
|
||||
json.dumps(misp_event, cls=EncodeFull)
|
||||
json.dumps(misp_event, cls=MISPEncode)
|
||||
|
||||
def test_searchIndexByTagId(self, m):
|
||||
self.initURI(m)
|
||||
|
@ -210,5 +210,33 @@ class TestOffline(unittest.TestCase):
|
|||
p.add_internal_other(evt, 'foobar')
|
||||
p.add_attachment(evt, "testFile")
|
||||
|
||||
def make_objects(self, path):
|
||||
to_return = {'objects': [], 'references': []}
|
||||
fo, peo, seos = make_binary_objects(path)
|
||||
|
||||
if seos:
|
||||
for s in seos:
|
||||
to_return['objects'].append(s)
|
||||
if s.ObjectReference:
|
||||
to_return['references'] += s.ObjectReference
|
||||
|
||||
if peo:
|
||||
to_return['objects'].append(peo)
|
||||
if peo.ObjectReference:
|
||||
to_return['references'] += peo.ObjectReference
|
||||
|
||||
if fo:
|
||||
to_return['objects'].append(fo)
|
||||
if fo.ObjectReference:
|
||||
to_return['references'] += fo.ObjectReference
|
||||
return json.dumps(to_return, cls=MISPEncode)
|
||||
|
||||
def test_objects(self, m):
|
||||
paths = ['cmd.exe', 'tmux', 'MachO-OSX-x64-ls']
|
||||
for path in paths:
|
||||
json_blob = self.make_objects(os.path.join('tests',
|
||||
'viper-test-files', 'test_files', path))
|
||||
print(json_blob)
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
Loading…
Reference in New Issue