chg: Deprecate everything in PyMISP

pull/417/head
Raphaël Vinot 2019-07-12 17:35:02 +02:00
parent 4de403c537
commit c9d58dad8a
11 changed files with 2119 additions and 418 deletions

View File

@ -3,11 +3,6 @@ name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true
[[source]]
name = "lief_index"
url = "https://lief-project.github.io/packages/"
verify_ssl = true
[dev-packages]
nose = "*"
coveralls = "*"
@ -17,7 +12,6 @@ requests-mock = "*"
[packages]
pymisp = {editable = true,extras = ["fileobjects", "neo", "openioc", "virustotal", "pdfexport"],path = "."}
pymispwarninglists = {editable = true,git = "https://github.com/MISP/PyMISPWarningLists.git"}
lief = {version = ">=0.10.0.dev0",index = "lief_index",markers = "python_version >= '3.5'"}
[requires]
python_version = "3"

80
Pipfile.lock generated
View File

@ -1,7 +1,7 @@
{
"_meta": {
"hash": {
"sha256": "4056bb4063c740e772370f0dc360c08ac4e45bdbee16d0717aa1ef2698c08653"
"sha256": "92d8e062fe9d5baadd6145057fd6bd30db2c696628a6b3d697ae66431f4dace0"
},
"pipfile-spec": 6,
"requires": {
@ -12,11 +12,6 @@
"name": "pypi",
"url": "https://pypi.org/simple",
"verify_ssl": true
},
{
"name": "lief_index",
"url": "https://lief-project.github.io/packages/",
"verify_ssl": true
}
]
},
@ -71,6 +66,13 @@
],
"version": "==4.4.0"
},
"deprecated": {
"hashes": [
"sha256:a515c4cf75061552e0284d123c3066fbbe398952c87333a92b8fc3dd8e4f9cc1",
"sha256:b07b414c8aac88f60c1d837d21def7e83ba711052e03b3cbaff27972567a8f8d"
],
"version": "==1.2.6"
},
"idna": {
"hashes": [
"sha256:c357b3f628cf53ae2c4c05627ecc484553142ca23264e593d327bcde5e9c3407",
@ -100,8 +102,6 @@
"sha256:efa5f3523c01f7f0f5f2c14e5ac808e2447d1435c6a2872e5ab1a97ef1b0db9b",
"sha256:f1aadb344b5e14b308167bd2c9f31f1915e3c4e3f9a9ca92ff7b7bfbede5034c"
],
"index": "lief_index",
"markers": "python_version >= '3.5'",
"version": "==0.10.0.dev0"
},
"neobolt": {
@ -118,34 +118,34 @@
},
"pillow": {
"hashes": [
"sha256:15c056bfa284c30a7f265a41ac4cbbc93bdbfc0dfe0613b9cb8a8581b51a9e55",
"sha256:1a4e06ba4f74494ea0c58c24de2bb752818e9d504474ec95b0aa94f6b0a7e479",
"sha256:1c3c707c76be43c9e99cb7e3d5f1bee1c8e5be8b8a2a5eeee665efbf8ddde91a",
"sha256:1fd0b290203e3b0882d9605d807b03c0f47e3440f97824586c173eca0aadd99d",
"sha256:24114e4a6e1870c5a24b1da8f60d0ba77a0b4027907860188ea82bd3508c80eb",
"sha256:258d886a49b6b058cd7abb0ab4b2b85ce78669a857398e83e8b8e28b317b5abb",
"sha256:33c79b6dd6bc7f65079ab9ca5bebffb5f5d1141c689c9c6a7855776d1b09b7e8",
"sha256:367385fc797b2c31564c427430c7a8630db1a00bd040555dfc1d5c52e39fcd72",
"sha256:3c1884ff078fb8bf5f63d7d86921838b82ed4a7d0c027add773c2f38b3168754",
"sha256:44e5240e8f4f8861d748f2a58b3f04daadab5e22bfec896bf5434745f788f33f",
"sha256:46aa988e15f3ea72dddd81afe3839437b755fffddb5e173886f11460be909dce",
"sha256:74d90d499c9c736d52dd6d9b7221af5665b9c04f1767e35f5dd8694324bd4601",
"sha256:809c0a2ce9032cbcd7b5313f71af4bdc5c8c771cb86eb7559afd954cab82ebb5",
"sha256:85d1ef2cdafd5507c4221d201aaf62fc9276f8b0f71bd3933363e62a33abc734",
"sha256:8c3889c7681af77ecfa4431cd42a2885d093ecb811e81fbe5e203abc07e0995b",
"sha256:9218d81b9fca98d2c47d35d688a0cea0c42fd473159dfd5612dcb0483c63e40b",
"sha256:9aa4f3827992288edd37c9df345783a69ef58bd20cc02e64b36e44bcd157bbf1",
"sha256:9d80f44137a70b6f84c750d11019a3419f409c944526a95219bea0ac31f4dd91",
"sha256:b7ebd36128a2fe93991293f997e44be9286503c7530ace6a55b938b20be288d8",
"sha256:c4c78e2c71c257c136cdd43869fd3d5e34fc2162dc22e4a5406b0ebe86958239",
"sha256:c6a842537f887be1fe115d8abb5daa9bc8cc124e455ff995830cc785624a97af",
"sha256:cf0a2e040fdf5a6d95f4c286c6ef1df6b36c218b528c8a9158ec2452a804b9b8",
"sha256:cfd28aad6fc61f7a5d4ee556a997dc6e5555d9381d1390c00ecaf984d57e4232",
"sha256:dca5660e25932771460d4688ccbb515677caaf8595f3f3240ec16c117deff89a",
"sha256:de7aedc85918c2f887886442e50f52c1b93545606317956d65f342bd81cb4fc3",
"sha256:e6c0bbf8e277b74196e3140c35f9a1ae3eafd818f7f2d3a15819c49135d6c062"
"sha256:0804f77cb1e9b6dbd37601cee11283bba39a8d44b9ddb053400c58e0c0d7d9de",
"sha256:0ab7c5b5d04691bcbd570658667dd1e21ca311c62dcfd315ad2255b1cd37f64f",
"sha256:0b3e6cf3ea1f8cecd625f1420b931c83ce74f00c29a0ff1ce4385f99900ac7c4",
"sha256:365c06a45712cd723ec16fa4ceb32ce46ad201eb7bbf6d3c16b063c72b61a3ed",
"sha256:38301fbc0af865baa4752ddae1bb3cbb24b3d8f221bf2850aad96b243306fa03",
"sha256:3aef1af1a91798536bbab35d70d35750bd2884f0832c88aeb2499aa2d1ed4992",
"sha256:3fe0ab49537d9330c9bba7f16a5f8b02da615b5c809cdf7124f356a0f182eccd",
"sha256:45a619d5c1915957449264c81c008934452e3fd3604e36809212300b2a4dab68",
"sha256:49f90f147883a0c3778fd29d3eb169d56416f25758d0f66775db9184debc8010",
"sha256:571b5a758baf1cb6a04233fb23d6cf1ca60b31f9f641b1700bfaab1194020555",
"sha256:5ac381e8b1259925287ccc5a87d9cf6322a2dc88ae28a97fe3e196385288413f",
"sha256:6153db744a743c0c8c91b8e3b9d40e0b13a5d31dbf8a12748c6d9bfd3ddc01ad",
"sha256:6fd63afd14a16f5d6b408f623cc2142917a1f92855f0df997e09a49f0341be8a",
"sha256:70acbcaba2a638923c2d337e0edea210505708d7859b87c2bd81e8f9902ae826",
"sha256:70b1594d56ed32d56ed21a7fbb2a5c6fd7446cdb7b21e749c9791eac3a64d9e4",
"sha256:76638865c83b1bb33bcac2a61ce4d13c17dba2204969dedb9ab60ef62bede686",
"sha256:7b2ec162c87fc496aa568258ac88631a2ce0acfe681a9af40842fc55deaedc99",
"sha256:7cee2cef07c8d76894ebefc54e4bb707dfc7f258ad155bd61d87f6cd487a70ff",
"sha256:7d16d4498f8b374fc625c4037742fbdd7f9ac383fd50b06f4df00c81ef60e829",
"sha256:b50bc1780681b127e28f0075dfb81d6135c3a293e0c1d0211133c75e2179b6c0",
"sha256:bd0582f831ad5bcad6ca001deba4568573a4675437db17c4031939156ff339fa",
"sha256:cfd40d8a4b59f7567620410f966bb1f32dc555b2b19f82a91b147fac296f645c",
"sha256:e3ae410089de680e8f84c68b755b42bc42c0ceb8c03dbea88a5099747091d38e",
"sha256:e9046e559c299b395b39ac7dbf16005308821c2f24a63cae2ab173bd6aa11616",
"sha256:ef6be704ae2bc8ad0ebc5cb850ee9139493b0fc4e81abcc240fb392a63ebc808",
"sha256:f8dc19d92896558f9c4317ee365729ead9d7bbcf2052a9a19a3ef17abbb8ac5b"
],
"version": "==6.0.0"
"version": "==6.1.0"
},
"prompt-toolkit": {
"hashes": [
@ -192,9 +192,9 @@
},
"pyrsistent": {
"hashes": [
"sha256:16692ee739d42cf5e39cef8d27649a8c1fdb7aa99887098f1460057c5eb75c3a"
"sha256:50cffebc87ca91b9d4be2dcc2e479272bcb466b5a0487b6c271f7ddea6917e14"
],
"version": "==0.15.2"
"version": "==0.15.3"
},
"python-dateutil": {
"hashes": [
@ -290,6 +290,12 @@
"sha256:f4ebe71925af7b40a864553f761ed559b43544f8f71746c2d756c7fe788ade7c"
],
"version": "==0.1.7"
},
"wrapt": {
"hashes": [
"sha256:565a021fd19419476b9362b05eeaa094178de64f8361e44468f9e9d7843901e1"
],
"version": "==1.11.2"
}
},
"develop": {

View File

@ -5,7 +5,7 @@ import argparse
import json
try:
from pymisp import MISPEncode
from pymisp import MISPEncode, AbstractMISP
from pymisp.tools import make_binary_objects
except ImportError:
pass
@ -59,6 +59,7 @@ if __name__ == '__main__':
group.add_argument("-p", "--path", help="Path to process.")
group.add_argument("-c", "--check", action='store_true', help="Check the dependencies.")
args = parser.parse_args()
a = AbstractMISP()
if args.check:
print(check())

View File

@ -1,6 +1,5 @@
__version__ = '2.4.111'
import logging
import functools
import warnings
import sys
@ -14,28 +13,26 @@ logger.addHandler(default_handler)
logger.setLevel(logging.WARNING)
def deprecated(func):
'''This is a decorator which can be used to mark functions
as deprecated. It will result in a warning being emitted
when the function is used.'''
def warning_2020():
@functools.wraps(func)
def new_func(*args, **kwargs):
warnings.showwarning(
"Call to deprecated function {}.".format(func.__name__),
category=DeprecationWarning,
filename=func.__code__.co_filename,
lineno=func.__code__.co_firstlineno + 1
)
return func(*args, **kwargs)
return new_func
if sys.version_info < (3, 6):
warnings.warn("""
Python 2.7 is officially end of life the 2020-01-01. For this occasion,
we decided to review which versions of Python we support and our conclusion
is to only support python 3.6+ starting the 2020-01-01.
Every version of pymisp released after the 2020-01-01 will fail if the
python interpreter is prior to python 3.6.
**Please update your codebase.**""", DeprecationWarning, stacklevel=3)
try:
warning_2020()
from .exceptions import PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey, InvalidMISPObject, UnknownMISPObjectTemplate, PyMISPInvalidFormat, MISPServerError, PyMISPNotImplementedYet, PyMISPUnexpectedResponse, PyMISPEmptyResponse # noqa
from .api import PyMISP # noqa
from .abstract import AbstractMISP, MISPEncode, MISPTag, Distribution, ThreatLevel, Analysis # noqa
from .mispevent import MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, MISPOrganisation, MISPSighting, MISPLog, MISPShadowAttribute # noqa
from .mispevent import MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, MISPOrganisation, MISPSighting, MISPLog, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed # noqa
from .tools import AbstractMISPObjectGenerator # noqa
from .tools import Neo4j # noqa
from .tools import stix # noqa
@ -44,6 +41,7 @@ try:
from .tools import ext_lookups # noqa
if sys.version_info >= (3, 6):
from .aping import ExpandedPyMISP # noqa
# Let's not bother with old python
try:
from .tools import reportlab_generator # noqa
@ -53,8 +51,6 @@ try:
except NameError:
# FIXME: The import should not raise an exception if reportlab isn't installed
pass
if sys.version_info >= (3, 6):
from .aping import ExpandedPyMISP # noqa
logger.debug('pymisp loaded properly')
except ImportError as e:
logger.warning('Unable to load pymisp properly: {}'.format(e))

View File

@ -13,14 +13,13 @@ from .exceptions import PyMISPInvalidFormat
# Try to import MutableMapping the python 3.3+ way
try:
from collections.abc import MutableMapping
except:
except Exception:
pass
logger = logging.getLogger('pymisp')
if sys.version_info < (3, 0):
logger.warning("You're using python 2, it is strongly recommended to use python >=3.6")
from collections import MutableMapping
# This is required because Python 2 is a pain.
@ -74,7 +73,7 @@ class MISPEncode(JSONEncoder):
def default(self, obj):
if isinstance(obj, AbstractMISP):
return obj.jsonable()
elif isinstance(obj, datetime.datetime):
elif isinstance(obj, (datetime.datetime, datetime.date)):
return obj.isoformat()
elif isinstance(obj, Enum):
return obj.value
@ -270,16 +269,17 @@ class AbstractMISP(MutableMapping):
else:
return False
def __repr__(self):
if hasattr(self, 'name'):
return '<{self.__class__.__name__}(name={self.name})'.format(self=self)
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
class MISPTag(AbstractMISP):
def __init__(self):
super(MISPTag, self).__init__()
def from_dict(self, name, **kwargs):
self.name = name
def from_dict(self, **kwargs):
if kwargs.get('Tag'):
kwargs = kwargs.get('Tag')
super(MISPTag, self).from_dict(**kwargs)
def __repr__(self):
if hasattr(self, 'name'):
return '<{self.__class__.__name__}(name={self.name})'.format(self=self)
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -125,6 +125,10 @@
"default_category": "Network activity",
"to_ids": 1
},
"community-id": {
"default_category": "Network activity",
"to_ids": 1
},
"pattern-in-file": {
"default_category": "Payload installation",
"to_ids": 1
@ -666,6 +670,7 @@
"snort",
"bro",
"zeek",
"community-id",
"pattern-in-file",
"pattern-in-traffic",
"pattern-in-memory",
@ -1075,7 +1080,8 @@
"hostname|port",
"bro",
"zeek",
"anonymised"
"anonymised",
"community-id"
],
"Payload type": [
"comment",
@ -1145,7 +1151,8 @@
"github-repository",
"other",
"cortex",
"anonymised"
"anonymised",
"community-id"
],
"Financial fraud": [
"btc",

View File

@ -11,17 +11,15 @@ import sys
import uuid
from collections import defaultdict
from . import deprecated
from deprecated import deprecated
from .abstract import AbstractMISP
from .exceptions import UnknownMISPObjectTemplate, InvalidMISPObject, PyMISPError, NewEventError, NewAttributeError
import logging
logger = logging.getLogger('pymisp')
if sys.version_info < (3, 0):
logger.warning("You're using python 2, it is strongly recommended to use python >=3.6")
# This is required because Python 2 is a pain.
from datetime import tzinfo, timedelta
@ -354,23 +352,23 @@ class MISPAttribute(AbstractMISP):
signed, _ = c.sign(to_sign, mode=mode.DETACH)
self.sig = base64.b64encode(signed).decode()
@deprecated
@deprecated(reason="Use self.known_types instead. Removal date: 2020-01-01.")
def get_known_types(self): # pragma: no cover
return self.known_types
@deprecated
@deprecated(reason="Use self.malware_binary instead. Removal date: 2020-01-01.")
def get_malware_binary(self): # pragma: no cover
return self.malware_binary
@deprecated
@deprecated(reason="Use self.to_dict() instead. Removal date: 2020-01-01.")
def _json(self): # pragma: no cover
return self.to_dict()
@deprecated
@deprecated(reason="Use self.to_dict() instead. Removal date: 2020-01-01.")
def _json_full(self): # pragma: no cover
return self.to_dict()
@deprecated
@deprecated(reason="Use self.from_dict(**kwargs) instead. Removal date: 2020-01-01.")
def set_all_values(self, **kwargs): # pragma: no cover
self.from_dict(**kwargs)
@ -782,15 +780,15 @@ class MISPEvent(AbstractMISP):
to_return['global'] = False
return to_return
@deprecated
@deprecated(reason="Use self.known_types instead. Removal date: 2020-01-01.")
def get_known_types(self): # pragma: no cover
return self.known_types
@deprecated
@deprecated(reason="Use self.from_dict(**kwargs) instead. Removal date: 2020-01-01.")
def set_all_values(self, **kwargs): # pragma: no cover
self.from_dict(**kwargs)
@deprecated
@deprecated(reason="Use self.to_dict() instead. Removal date: 2020-01-01.")
def _json(self): # pragma: no cover
return self.to_dict()
@ -800,19 +798,28 @@ class MISPObjectReference(AbstractMISP):
def __init__(self):
super(MISPObjectReference, self).__init__()
def from_dict(self, object_uuid, referenced_uuid, relationship_type, comment=None, **kwargs):
self.object_uuid = object_uuid
self.referenced_uuid = referenced_uuid
self.relationship_type = relationship_type
self.comment = comment
def from_dict(self, **kwargs):
if kwargs.get('ObjectReference'):
kwargs = kwargs.get('ObjectReference')
super(MISPObjectReference, self).from_dict(**kwargs)
def __repr__(self):
if hasattr(self, 'referenced_uuid'):
if hasattr(self, 'referenced_uuid') and hasattr(self, 'object_uuid'):
return '<{self.__class__.__name__}(object_uuid={self.object_uuid}, referenced_uuid={self.referenced_uuid}, relationship_type={self.relationship_type})'.format(self=self)
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
class MISPObjectTemplate(AbstractMISP):
def __init__(self):
super(MISPObjectTemplate, self).__init__()
def from_dict(self, **kwargs):
if kwargs.get('ObjectTemplate'):
kwargs = kwargs.get('ObjectTemplate')
super(MISPObjectTemplate, self).from_dict(**kwargs)
class MISPUser(AbstractMISP):
def __init__(self):
@ -840,12 +847,99 @@ class MISPFeed(AbstractMISP):
def __init__(self):
super(MISPFeed, self).__init__()
def from_dict(self, **kwargs):
if kwargs.get('Feed'):
kwargs = kwargs.get('Feed')
super(MISPFeed, self).from_dict(**kwargs)
class MISPWarninglist(AbstractMISP):
def __init__(self):
super(MISPWarninglist, self).__init__()
def from_dict(self, **kwargs):
if kwargs.get('Warninglist'):
kwargs = kwargs.get('Warninglist')
super(MISPWarninglist, self).from_dict(**kwargs)
class MISPTaxonomy(AbstractMISP):
def __init__(self):
super(MISPTaxonomy, self).__init__()
def from_dict(self, **kwargs):
if kwargs.get('Taxonomy'):
kwargs = kwargs.get('Taxonomy')
super(MISPTaxonomy, self).from_dict(**kwargs)
class MISPGalaxy(AbstractMISP):
def __init__(self):
super(MISPGalaxy, self).__init__()
def from_dict(self, **kwargs):
if kwargs.get('Galaxy'):
kwargs = kwargs.get('Galaxy')
super(MISPGalaxy, self).from_dict(**kwargs)
class MISPNoticelist(AbstractMISP):
def __init__(self):
super(MISPNoticelist, self).__init__()
def from_dict(self, **kwargs):
if kwargs.get('Noticelist'):
kwargs = kwargs.get('Noticelist')
super(MISPNoticelist, self).from_dict(**kwargs)
class MISPRole(AbstractMISP):
def __init__(self):
super(MISPRole, self).__init__()
def from_dict(self, **kwargs):
if kwargs.get('Role'):
kwargs = kwargs.get('Role')
super(MISPRole, self).from_dict(**kwargs)
class MISPServer(AbstractMISP):
def __init__(self):
super(MISPServer, self).__init__()
def from_dict(self, **kwargs):
if kwargs.get('Server'):
kwargs = kwargs.get('Server')
super(MISPServer, self).from_dict(**kwargs)
class MISPSharingGroup(AbstractMISP):
def __init__(self):
super(MISPSharingGroup, self).__init__()
def from_dict(self, **kwargs):
if kwargs.get('SharingGroup'):
kwargs = kwargs.get('SharingGroup')
super(MISPSharingGroup, self).from_dict(**kwargs)
class MISPLog(AbstractMISP):
def __init__(self):
super(MISPLog, self).__init__()
def from_dict(self, **kwargs):
if kwargs.get('Log'):
kwargs = kwargs.get('Log')
super(MISPLog, self).from_dict(**kwargs)
def __repr__(self):
return '<{self.__class__.__name__}({self.model}, {self.action}, {self.title})'.format(self=self)
@ -855,7 +949,7 @@ class MISPSighting(AbstractMISP):
def __init__(self):
super(MISPSighting, self).__init__()
def from_dict(self, value=None, uuid=None, id=None, source=None, type=None, timestamp=None, **kwargs):
def from_dict(self, **kwargs):
"""Initialize the MISPSighting from a dictionary
:value: Value of the attribute the sighting is related too. Pushing this object
will update the sighting count of each attriutes with thifs value on the instance
@ -865,12 +959,8 @@ class MISPSighting(AbstractMISP):
:type: Type of the sighting
:timestamp: Timestamp associated to the sighting
"""
self.value = value
self.uuid = uuid
self.id = id
self.source = source
self.type = type
self.timestamp = timestamp
if kwargs.get('Sighting'):
kwargs = kwargs.get('Sighting')
super(MISPSighting, self).from_dict(**kwargs)
def __repr__(self):
@ -917,7 +1007,6 @@ class MISPObjectAttribute(MISPAttribute):
class MISPShadowAttribute(AbstractMISP):
# NOTE: Kindof a MISPAttribute, but can be lot more lightweight (just one key for example)
def __init__(self):
super(MISPShadowAttribute, self).__init__()
@ -927,6 +1016,11 @@ class MISPShadowAttribute(AbstractMISP):
kwargs = kwargs.get('ShadowAttribute')
super(MISPShadowAttribute, self).from_dict(**kwargs)
def __repr__(self):
if hasattr(self, 'value'):
return '<{self.__class__.__name__}(type={self.type}, value={self.value})'.format(self=self)
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
class MISPObject(AbstractMISP):
@ -1176,12 +1270,3 @@ class MISPObject(AbstractMISP):
if hasattr(self, 'name'):
return '<{self.__class__.__name__}(name={self.name})'.format(self=self)
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
class MISPSharingGroup(AbstractMISP):
def __init__(self):
super(MISPSharingGroup, self).__init__()
def from_dict(self, **kwargs):
super(MISPSharingGroup, self).from_dict(**kwargs)

View File

@ -41,8 +41,8 @@ setup(
],
install_requires=['six', 'requests', 'python-dateutil', 'jsonschema',
'python-dateutil', 'enum34;python_version<"3.4"',
'functools32;python_version<"3.0"'],
extras_require={'fileobjects': ['lief>=0.8', 'python-magic', 'pydeep'],
'functools32;python_version<"3.0"', 'deprecated'],
extras_require={'fileobjects': ['lief>=0.8,<0.10;python_version<"3.5"', 'lief>=0.10.0.dev0;python_version>"3.5"', 'python-magic', 'pydeep'],
'neo': ['py2neo'],
'openioc': ['beautifulsoup4'],
'virustotal': ['validators'],

View File

@ -13,6 +13,7 @@ import re
import json
from pathlib import Path
import urllib3
import time
from uuid import uuid4
@ -20,8 +21,8 @@ import logging
logging.disable(logging.CRITICAL)
try:
from pymisp import ExpandedPyMISP, MISPEvent, MISPOrganisation, MISPUser, Distribution, ThreatLevel, Analysis, MISPObject, MISPAttribute
from pymisp.tools import CSVLoader, DomainIPObject, ASNObject
from pymisp import ExpandedPyMISP, MISPEvent, MISPOrganisation, MISPUser, Distribution, ThreatLevel, Analysis, MISPObject, MISPAttribute, MISPSighting, MISPShadowAttribute, MISPTag, MISPSharingGroup, MISPFeed, MISPServer
from pymisp.tools import CSVLoader, DomainIPObject, ASNObject, GenericObjectGenerator
except ImportError:
if sys.version_info < (3, 6):
print('This test suite requires Python 3.6+, breaking.')
@ -35,12 +36,15 @@ try:
travis_run = True
except ImportError as e:
print(e)
url = 'http://localhost:8080'
key = 'HRizIMmaxBOXAQSzKZ874rDWUsQEk4vGAGBoljQO'
url = 'https://localhost:8443'
key = 'K5yV0CcxdnklzDfCKlnPniIxrMX41utQ2dG13zZ3'
verifycert = False
travis_run = False
urllib3.disable_warnings()
class TestComprehensive(unittest.TestCase):
@classmethod
@ -58,14 +62,14 @@ class TestComprehensive(unittest.TestCase):
user = MISPUser()
user.email = 'testusr@user.local'
user.org_id = cls.test_org.id
cls.test_usr = cls.admin_misp_connector.add_user(user)
cls.test_usr = cls.admin_misp_connector.add_user(user, pythonify=True)
cls.user_misp_connector = ExpandedPyMISP(url, cls.test_usr.authkey, verifycert, debug=False)
# Creates a publisher
user = MISPUser()
user.email = 'testpub@user.local'
user.org_id = cls.test_org.id
user.role_id = 4
cls.test_pub = cls.admin_misp_connector.add_user(user)
cls.test_pub = cls.admin_misp_connector.add_user(user, pythonify=True)
cls.pub_misp_connector = ExpandedPyMISP(url, cls.test_pub.authkey, verifycert)
# Update all json stuff
cls.admin_misp_connector.update_object_templates()
@ -81,7 +85,7 @@ class TestComprehensive(unittest.TestCase):
# Delete user
cls.admin_misp_connector.delete_user(user_id=cls.test_usr.id)
# Delete org
cls.admin_misp_connector.delete_organisation(org_id=cls.test_org.id)
cls.admin_misp_connector.delete_organisation(organisation_id=cls.test_org.id)
def create_simple_event(self, force_timestamps=False):
mispevent = MISPEvent(force_timestamps=force_timestamps)
@ -497,14 +501,13 @@ class TestComprehensive(unittest.TestCase):
self.assertEqual(first.objects[1].distribution, Distribution.inherit.value)
self.assertEqual(first.objects[1].attributes[0].distribution, Distribution.inherit.value)
# Attribute create
attribute = self.user_misp_connector.add_named_attribute(first, 'comment', 'bar')
# FIXME: Add helper that returns a list of MISPAttribute
self.assertEqual(attribute[0]['Attribute']['distribution'], str(Distribution.inherit.value))
attribute = self.user_misp_connector.add_attribute(first.id, {'type': 'comment', 'value': 'bar'}, pythonify=True)
self.assertEqual(attribute.value, 'bar', attribute.to_json())
self.assertEqual(attribute.distribution, Distribution.inherit.value, attribute.to_json())
# Object - add
o = MISPObject('file')
o.add_attribute('filename', value='blah.exe')
new_obj = self.user_misp_connector.add_object(first.id, o)
# FIXME: Add helper that returns a MISPObject
self.assertEqual(new_obj.distribution, int(Distribution.inherit.value))
self.assertEqual(new_obj.attributes[0].distribution, int(Distribution.inherit.value))
# Object - edit
@ -699,6 +702,13 @@ class TestComprehensive(unittest.TestCase):
self.assertEqual(len(events), 1)
self.assertIs(events[0].attributes[-1].malware_binary, None)
# Search index
events = self.user_misp_connector.search_index(timestamp=first.timestamp.timestamp(),
pythonify=True)
self.assertEqual(len(events), 1)
self.assertEqual(events[0].info, 'foo bar blah')
self.assertEqual(events[0].attributes, [])
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
@ -712,12 +722,12 @@ class TestComprehensive(unittest.TestCase):
first.attributes[0].comment = 'This is the modified comment'
attribute = self.user_misp_connector.update_attribute(first.attributes[0])
self.assertEqual(attribute.comment, 'This is the modified comment')
attribute = self.user_misp_connector.change_comment(first.attributes[0].uuid, 'This is the modified comment, again')
self.assertEqual(attribute['Attribute']['comment'], 'This is the modified comment, again')
attribute = self.user_misp_connector.change_disable_correlation(first.attributes[0].uuid, True)
self.assertEqual(attribute['Attribute']['disable_correlation'], True)
attribute = self.user_misp_connector.change_disable_correlation(first.attributes[0].uuid, 0)
self.assertEqual(attribute['Attribute']['disable_correlation'], False)
attribute = self.user_misp_connector.update_attribute({'comment': 'This is the modified comment, again'}, attribute.id)
self.assertEqual(attribute.comment, 'This is the modified comment, again')
attribute = self.user_misp_connector.update_attribute({'disable_correlation': True}, attribute.id)
self.assertTrue(attribute.disable_correlation)
attribute = self.user_misp_connector.update_attribute({'disable_correlation': False}, attribute.id)
self.assertFalse(attribute.disable_correlation)
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
@ -730,10 +740,15 @@ class TestComprehensive(unittest.TestCase):
second = self.user_misp_connector.add_event(second)
current_ts = int(time.time())
self.user_misp_connector.sighting(value=first.attributes[0].value)
self.user_misp_connector.sighting(value=second.attributes[0].value,
source='Testcases',
type='1')
r = self.user_misp_connector.add_sighting({'value': first.attributes[0].value})
self.assertEqual(r['message'], 'Sighting added')
s = MISPSighting()
s.value = second.attributes[0].value
s.source = 'Testcases'
s.type = '1'
r = self.user_misp_connector.add_sighting(s, second.attributes[0].id)
self.assertEqual(r['message'], 'Sighting added')
s = self.user_misp_connector.search_sightings(publish_timestamp=current_ts, include_attribute=True,
include_event_meta=True, pythonify=True)
@ -747,7 +762,7 @@ class TestComprehensive(unittest.TestCase):
include_event_meta=True,
pythonify=True)
self.assertEqual(len(s), 1)
self.assertEqual(s[0]['event'].id, second.id)
self.assertEqual(s[0]['event'].id, second.id, s)
self.assertEqual(s[0]['attribute'].id, second.attributes[0].id)
s = self.user_misp_connector.search_sightings(publish_timestamp=current_ts,
@ -770,6 +785,19 @@ class TestComprehensive(unittest.TestCase):
pythonify=True)
self.assertEqual(len(s), 1)
self.assertEqual(s[0]['sighting'].attribute_id, str(second.attributes[0].id))
# Get sightings from event/attribute / org
s = self.user_misp_connector.sightings(first, pythonify=True)
self.assertTrue(isinstance(s, list))
self.assertEqual(int(s[0].attribute_id), first.attributes[0].id)
r = self.admin_misp_connector.add_sighting(s, second.attributes[0].id)
self.assertEqual(r['message'], 'Sighting added')
s = self.user_misp_connector.sightings(second.attributes[0], pythonify=True)
self.assertEqual(len(s), 2)
s = self.user_misp_connector.sightings(second.attributes[0], self.test_org.id, pythonify=True)
self.assertEqual(len(s), 1)
self.assertEqual(s[0].org_id, self.test_org.id)
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
@ -786,13 +814,13 @@ class TestComprehensive(unittest.TestCase):
first = self.user_misp_connector.add_event(first)
second = self.user_misp_connector.add_event(second)
response = self.user_misp_connector.fast_publish(first.id, alert=False)
response = self.user_misp_connector.publish(first.id, alert=False)
self.assertEqual(response['errors'][1]['message'], 'You do not have permission to use this functionality.')
# Default search, attribute with to_ids == True
first.attributes[0].to_ids = True
first = self.user_misp_connector.update_event(first)
self.admin_misp_connector.fast_publish(first.id, alert=False)
self.admin_misp_connector.publish(first.id, alert=False)
csv = self.user_misp_connector.search(return_format='csv', publish_timestamp=first.timestamp.timestamp(), pythonify=True)
self.assertEqual(len(csv), 1)
self.assertEqual(csv[0]['value'], first.attributes[0].value)
@ -848,6 +876,9 @@ class TestComprehensive(unittest.TestCase):
for k in columns:
self.assertTrue(k in csv[0])
# FIXME Publish is async, if we delete the event too fast, we have an empty one.
# https://github.com/MISP/MISP/issues/4886
time.sleep(10)
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
@ -869,6 +900,7 @@ class TestComprehensive(unittest.TestCase):
# Delete event
self.admin_misp_connector.delete_event(first.id)
@unittest.skip("Wait for https://github.com/MISP/MISP/issues/4848")
def test_upload_sample(self):
first = self.create_simple_event()
second = self.create_simple_event()
@ -876,11 +908,8 @@ class TestComprehensive(unittest.TestCase):
try:
# Simple, not executable
first = self.user_misp_connector.add_event(first)
with open('tests/testlive_comprehensive.py', 'rb') as f:
response = self.user_misp_connector.upload_sample(filename='testfile.py', filepath_or_bytes=f.read(),
event_id=first.id)
response = self.user_misp_connector.add_sample_to_event(event_id=first.id, path_to_sample=Path('tests/testlive_comprehensive.py'))
self.assertTrue('message' in response, "Content of response: {}".format(response))
print(response)
self.assertEqual(response['message'], 'Success, saved all attributes.')
first = self.user_misp_connector.get_event(first.id)
self.assertEqual(len(first.objects), 1)
@ -888,8 +917,8 @@ class TestComprehensive(unittest.TestCase):
# Simple, executable
second = self.user_misp_connector.add_event(second)
with open('tests/viper-test-files/test_files/whoami.exe', 'rb') as f:
response = self.user_misp_connector.upload_sample(filename='whoami.exe', filepath_or_bytes=f.read(),
event_id=second.id)
pseudofile = BytesIO(f.read())
response = self.user_misp_connector.add_sample_to_event(event_id=second.id, filename='whoami.exe', pseudofile=pseudofile)
self.assertEqual(response['message'], 'Success, saved all attributes.')
second = self.user_misp_connector.get_event(second.id)
self.assertEqual(len(second.objects), 1)
@ -897,9 +926,7 @@ class TestComprehensive(unittest.TestCase):
third = self.user_misp_connector.add_event(third)
if not travis_run:
# Advanced, executable
with open('tests/viper-test-files/test_files/whoami.exe', 'rb') as f:
response = self.user_misp_connector.upload_sample(filename='whoami.exe', filepath_or_bytes=f.read(),
event_id=third.id, advanced_extraction=True)
response = self.user_misp_connector.add_sample_to_event(event_id=third.id, path_to_sample=Path('tests/viper-test-files/test_files/whoami.exe'), advanced_extraction=True)
self.assertEqual(response['message'], 'Success, saved all attributes.')
third = self.user_misp_connector.get_event(third.id)
self.assertEqual(len(third.objects), 7)
@ -932,19 +959,67 @@ class TestComprehensive(unittest.TestCase):
# Test with add_attributes
second = self.create_simple_event()
ip_dom = MISPObject('domain-ip')
ip_dom.add_attribute('domain', value='google.fr')
ip_dom.add_attribute('domain', value='google.fr', disable_correlation=True)
ip_dom.add_attributes('ip', {'value': '10.8.8.8', 'to_ids': False}, '10.9.8.8')
ip_dom.add_attributes('ip', '11.8.8.8', '11.9.8.8')
second.add_object(ip_dom)
second = self.user_misp_connector.add_event(second)
self.assertEqual(len(second.objects[0].attributes), 5)
self.assertTrue(second.objects[0].attributes[0].disable_correlation)
self.assertFalse(second.objects[0].attributes[1].to_ids)
self.assertTrue(second.objects[0].attributes[2].to_ids)
# Test generic Tag methods
r = self.admin_misp_connector.tag(second, 'generic_tag_test')
self.assertTrue(r['message'].endswith(f'successfully attached to Event({second.id}).'), r['message'])
r = self.admin_misp_connector.untag(second, 'generic_tag_test')
self.assertTrue(r['message'].endswith(f'successfully removed from Event({second.id}).'), r['message'])
# NOTE: object tagging not supported yet
# r = self.admin_misp_connector.tag(second.objects[0].uuid, 'generic_tag_test')
# self.assertTrue(r['message'].endswith(f'successfully attached to Object({second.objects[0].id}).'), r['message'])
# r = self.admin_misp_connector.untag(second.objects[0].uuid, 'generic_tag_test')
# self.assertTrue(r['message'].endswith(f'successfully removed from Object({second.objects[0].id}).'), r['message'])
r = self.admin_misp_connector.tag(second.objects[0].attributes[0].uuid, 'generic_tag_test')
self.assertTrue(r['message'].endswith(f'successfully attached to Attribute({second.objects[0].attributes[0].id}).'), r['message'])
r = self.admin_misp_connector.untag(second.objects[0].attributes[0].uuid, 'generic_tag_test')
self.assertTrue(r['message'].endswith(f'successfully removed from Attribute({second.objects[0].attributes[0].id}).'), r['message'])
# Delete tag to avoid polluting the db
tags = self.admin_misp_connector.tags(pythonify=True)
for t in tags:
if t.name == 'generic_tag_test':
response = self.admin_misp_connector.delete_tag(t.id)
self.assertEqual(response['message'], 'Tag deleted.')
# Test delete object
r = self.user_misp_connector.delete_object(second.objects[0].id)
self.assertEqual(r['message'], 'Object deleted')
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second.id)
def test_unknown_template(self):
first = self.create_simple_event()
attributeAsDict = [{'MyCoolAttribute': {'value': 'critical thing', 'type': 'text'}},
{'MyCoolerAttribute': {'value': 'even worse', 'type': 'text', 'disable_correlation': True}}]
misp_object = GenericObjectGenerator('my-cool-template')
misp_object.generate_attributes(attributeAsDict)
first.add_object(misp_object)
blah_object = MISPObject('BLAH_TEST')
blah_object.add_reference(misp_object.uuid, "test relation")
blah_object.add_attribute('transaction-number', value='foo', type="text", disable_correlation=True)
first.add_object(blah_object)
try:
first = self.user_misp_connector.add_event(first)
self.assertEqual(len(first.objects[0].attributes), 2)
self.assertFalse(first.objects[0].attributes[0].disable_correlation)
self.assertTrue(first.objects[0].attributes[1].disable_correlation)
self.assertTrue(first.objects[1].attributes[0].disable_correlation)
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
def test_domain_ip_object(self):
first = self.create_simple_event()
try:
@ -975,24 +1050,38 @@ class TestComprehensive(unittest.TestCase):
def test_object_template(self):
r = self.admin_misp_connector.update_object_templates()
self.assertEqual(type(r), list)
if not travis_run:
template = self.admin_misp_connector.get_object_template('688c46fb-5edb-40a3-8273-1af7923e2215')
self.assertEqual(template['ObjectTemplate']['uuid'], '688c46fb-5edb-40a3-8273-1af7923e2215')
object_templates = self.admin_misp_connector.object_templates(pythonify=True)
self.assertTrue(isinstance(object_templates, list))
for object_template in object_templates:
if object_template.name == 'file':
break
template = self.admin_misp_connector.get_object_template(object_template.uuid, pythonify=True)
self.assertEqual(template.name, 'file')
def test_tags(self):
# Get list
tags = self.admin_misp_connector.get_tags_list()
tags = self.admin_misp_connector.tags(pythonify=True)
self.assertTrue(isinstance(tags, list))
# Get tag
for tag in tags:
if not tag['hide_tag']:
if not tag.hide_tag:
break
tag = self.admin_misp_connector.get_tag(tags[0]['id'])
tag = self.admin_misp_connector.get_tag(tag.id, pythonify=True)
self.assertTrue('name' in tag)
r = self.admin_misp_connector.disable_tag(tag['id'])
self.assertTrue(r['Tag']['hide_tag'])
r = self.admin_misp_connector.enable_tag(tag['id'])
self.assertFalse(r['Tag']['hide_tag'])
# Enable by MISPTag
tag = self.admin_misp_connector.disable_tag(tag, pythonify=True)
self.assertTrue(tag.hide_tag)
tag = self.admin_misp_connector.enable_tag(tag, pythonify=True)
self.assertFalse(tag.hide_tag)
# Add tag
tag = MISPTag()
tag.name = 'this is a test tag'
new_tag = self.admin_misp_connector.add_tag(tag, pythonify=True)
self.assertEqual(new_tag.name, tag.name)
# Delete tag
response = self.admin_misp_connector.delete_tag(new_tag.id)
self.assertEqual(response['message'], 'Tag deleted.')
def test_add_event_with_attachment_object_controller(self):
first = self.create_simple_event()
@ -1006,16 +1095,20 @@ class TestComprehensive(unittest.TestCase):
r = self.user_misp_connector.add_object(first.id, peo)
self.assertEqual(r.name, 'pe', r)
for ref in peo.ObjectReference:
r = self.user_misp_connector.add_object_reference(ref)
self.assertTrue('ObjectReference' in r, r)
r = self.user_misp_connector.add_object_reference(ref, pythonify=True)
# FIXME: https://github.com/MISP/MISP/issues/4866
# self.assertEqual(r.object_uuid, peo.uuid, r.to_json())
r = self.user_misp_connector.add_object(first.id, fo)
obj_attrs = r.get_attributes_by_relation('ssdeep')
self.assertEqual(len(obj_attrs), 1, obj_attrs)
self.assertEqual(r.name, 'file', r)
for ref in fo.ObjectReference:
r = self.user_misp_connector.add_object_reference(ref)
self.assertTrue('ObjectReference' in r, r)
r = self.user_misp_connector.add_object_reference(fo.ObjectReference[0], pythonify=True)
# FIXME: https://github.com/MISP/MISP/issues/4866
# self.assertEqual(r.object_uuid, fo.uuid, r.to_json())
self.assertEqual(r.referenced_uuid, peo.uuid, r.to_json())
r = self.user_misp_connector.delete_object_reference(r.id)
self.assertEqual(r['message'], 'ObjectReference deleted')
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
@ -1043,19 +1136,22 @@ class TestComprehensive(unittest.TestCase):
r = self.admin_misp_connector.update_taxonomies()
self.assertEqual(r['name'], 'All taxonomy libraries are up to date already.')
# Get list
taxonomies = self.admin_misp_connector.get_taxonomies_list()
taxonomies = self.admin_misp_connector.taxonomies(pythonify=True)
self.assertTrue(isinstance(taxonomies, list))
list_name_test = 'tlp'
for tax in taxonomies:
if tax['Taxonomy']['namespace'] == list_name_test:
if tax.namespace == list_name_test:
break
if not travis_run:
r = self.admin_misp_connector.get_taxonomy(tax['Taxonomy']['id'])
self.assertEqual(r['Taxonomy']['namespace'], list_name_test)
self.assertTrue('enabled' in r['Taxonomy'])
r = self.admin_misp_connector.enable_taxonomy(tax['Taxonomy']['id'])
r = self.admin_misp_connector.get_taxonomy(tax.id, pythonify=True)
self.assertEqual(r.namespace, list_name_test)
self.assertTrue('enabled' in r)
r = self.admin_misp_connector.enable_taxonomy(tax.id)
self.assertEqual(r['message'], 'Taxonomy enabled')
r = self.admin_misp_connector.disable_taxonomy(tax['Taxonomy']['id'])
r = self.admin_misp_connector.enable_taxonomy_tags(tax.id)
# FIXME: https://github.com/MISP/MISP/issues/4865
# self.assertEqual(r, [])
r = self.admin_misp_connector.disable_taxonomy(tax.id)
self.assertEqual(r['message'], 'Taxonomy disabled')
def test_warninglists(self):
@ -1067,21 +1163,24 @@ class TestComprehensive(unittest.TestCase):
except Exception:
print(r)
# Get list
r = self.admin_misp_connector.get_warninglists()
# FIXME It returns Warninglists object instead of a list of warning lists directly. This is inconsistent.
warninglists = r['Warninglists']
warninglists = self.admin_misp_connector.warninglists(pythonify=True)
self.assertTrue(isinstance(warninglists, list))
list_name_test = 'List of known hashes with common false-positives (based on Florian Roth input list)'
for wl in warninglists:
if wl['Warninglist']['name'] == list_name_test:
if wl.name == list_name_test:
break
testwl = wl['Warninglist']
r = self.admin_misp_connector.get_warninglist(testwl['id'])
self.assertEqual(r['Warninglist']['name'], list_name_test)
self.assertTrue('WarninglistEntry' in r['Warninglist'])
r = self.admin_misp_connector.enable_warninglist(testwl['id'])
testwl = wl
r = self.admin_misp_connector.get_warninglist(testwl.id, pythonify=True)
self.assertEqual(r.name, list_name_test)
self.assertTrue('WarninglistEntry' in r)
r = self.admin_misp_connector.enable_warninglist(testwl.id)
self.assertEqual(r['success'], '1 warninglist(s) enabled')
r = self.admin_misp_connector.disable_warninglist(testwl['id'])
# Check if a value is in a warning list
md5_empty_file = 'd41d8cd98f00b204e9800998ecf8427e'
r = self.user_misp_connector.values_in_warninglist([md5_empty_file])
self.assertEqual(r[md5_empty_file][0]['name'], list_name_test)
r = self.admin_misp_connector.disable_warninglist(testwl.id)
self.assertEqual(r['success'], '1 warninglist(s) disabled')
def test_noticelists(self):
@ -1089,20 +1188,21 @@ class TestComprehensive(unittest.TestCase):
r = self.admin_misp_connector.update_noticelists()
self.assertEqual(r['name'], 'All noticelists are up to date already.')
# Get list
noticelists = self.admin_misp_connector.get_noticelists()
noticelists = self.admin_misp_connector.noticelists(pythonify=True)
self.assertTrue(isinstance(noticelists, list))
list_name_test = 'gdpr'
for nl in noticelists:
if nl['Noticelist']['name'] == list_name_test:
if nl.name == list_name_test:
break
testnl = nl
r = self.admin_misp_connector.get_noticelist(testnl['Noticelist']['id'])
self.assertEqual(r['Noticelist']['name'], list_name_test)
self.assertTrue('NoticelistEntry' in r['Noticelist'])
r = self.admin_misp_connector.enable_noticelist(testnl['Noticelist']['id'])
self.assertTrue(r['Noticelist']['enabled'])
r = self.admin_misp_connector.disable_noticelist(testnl['Noticelist']['id'])
self.assertFalse(r['Noticelist']['enabled'])
r = self.admin_misp_connector.get_noticelist(testnl.id, pythonify=True)
self.assertEqual(r.name, list_name_test)
# FIXME: https://github.com/MISP/MISP/issues/4856
self.assertTrue('NoticelistEntry' in r)
r = self.admin_misp_connector.enable_noticelist(testnl.id)
self.assertTrue(r['Noticelist']['enabled'], r)
r = self.admin_misp_connector.disable_noticelist(testnl.id)
self.assertFalse(r['Noticelist']['enabled'], r)
def test_galaxies(self):
if not travis_run:
@ -1110,22 +1210,23 @@ class TestComprehensive(unittest.TestCase):
r = self.admin_misp_connector.update_galaxies()
self.assertEqual(r['name'], 'Galaxies updated.')
# Get list
galaxies = self.admin_misp_connector.get_galaxies()
galaxies = self.admin_misp_connector.galaxies(pythonify=True)
self.assertTrue(isinstance(galaxies, list))
list_name_test = 'Mobile Attack - Attack Pattern'
for galaxy in galaxies:
if galaxy['Galaxy']['name'] == list_name_test:
if galaxy.name == list_name_test:
break
r = self.admin_misp_connector.get_galaxy(galaxy['Galaxy']['id'])
self.assertEqual(r['Galaxy']['name'], list_name_test)
self.assertTrue('GalaxyCluster' in r)
r = self.admin_misp_connector.get_galaxy(galaxy.id, pythonify=True)
self.assertEqual(r.name, list_name_test)
# FIXME: Fails due to https://github.com/MISP/MISP/issues/4855
# self.assertTrue('GalaxyCluster' in r)
def test_zmq(self):
first = self.create_simple_event()
try:
first = self.user_misp_connector.add_event(first)
if not travis_run:
r = self.admin_misp_connector.pushEventToZMQ(first.id)
r = self.admin_misp_connector.push_event_to_ZMQ(first.id)
self.assertEqual(r['message'], 'Event published to ZMQ')
finally:
# Delete event
@ -1150,11 +1251,44 @@ class TestComprehensive(unittest.TestCase):
self.admin_misp_connector.delete_event(first.id)
def test_user(self):
user = self.user_misp_connector.get_user()
# Get list
users = self.admin_misp_connector.users(pythonify=True)
self.assertTrue(isinstance(users, list))
users_email = 'testusr@user.local'
for user in users:
if user.email == users_email:
break
self.assertEqual(user.email, users_email)
# get user
user = self.user_misp_connector.get_user(pythonify=True)
self.assertEqual(user.authkey, self.test_usr.authkey)
# Update user
user.email = 'foo@bar.de'
user = self.admin_misp_connector.update_user(user, pythonify=True)
self.assertEqual(user.email, 'foo@bar.de')
def test_organisation(self):
# Get list
orgs = self.admin_misp_connector.organisations(pythonify=True)
self.assertTrue(isinstance(orgs, list))
org_name = 'ORGNAME'
for org in orgs:
if org.name == org_name:
break
self.assertEqual(org.name, org_name)
# Get org
organisation = self.user_misp_connector.get_organisation(self.test_usr.org_id)
self.assertEqual(organisation.name, 'Test Org')
# Update org
organisation.name = 'blah'
organisation = self.admin_misp_connector.update_organisation(organisation)
self.assertEqual(organisation.name, 'blah')
def test_attribute(self):
first = self.create_simple_event()
second = self.create_simple_event()
second.add_attribute('ip-src', '11.11.11.11')
second.distribution = Distribution.all_communities
try:
first = self.user_misp_connector.add_event(first)
# Get attribute
@ -1178,8 +1312,11 @@ class TestComprehensive(unittest.TestCase):
new_attribute = self.user_misp_connector.update_attribute(new_attribute)
self.assertEqual(new_attribute.value, '5.6.3.4')
# Update attribute as proposal
new_proposal_update = self.user_misp_connector.update_attribute_proposal(new_attribute.id, {'to_ids': False})
new_proposal_update = self.user_misp_connector.update_attribute_proposal(new_attribute.id, {'to_ids': False}, pythonify=True)
self.assertEqual(new_proposal_update.to_ids, False)
# Delete attribute as proposal
proposal_delete = self.user_misp_connector.delete_attribute_proposal(new_attribute.id)
self.assertTrue(proposal_delete['saved'])
# Get attribute proposal
temp_new_proposal = self.user_misp_connector.get_attribute_proposal(new_proposal.id)
self.assertEqual(temp_new_proposal.uuid, new_proposal.uuid)
@ -1198,35 +1335,288 @@ class TestComprehensive(unittest.TestCase):
self.assertEqual(response['message'], 'Proposal discarded.')
attribute = self.user_misp_connector.get_attribute(new_attribute.id)
self.assertEqual(attribute.to_ids, False)
# Test fallback to proposal if the user doesn't own the event
second = self.admin_misp_connector.add_event(second, pythonify=True)
# FIXME: attribute needs to be a complete MISPAttribute: https://github.com/MISP/MISP/issues/4868
prop_attr = MISPAttribute()
prop_attr.from_dict(**{'type': 'ip-dst', 'value': '123.43.32.21'})
attribute = self.user_misp_connector.add_attribute(second.id, prop_attr)
self.assertTrue(isinstance(attribute, MISPShadowAttribute))
attribute = self.user_misp_connector.update_attribute({'comment': 'blah'}, second.attributes[0].id)
self.assertTrue(isinstance(attribute, MISPShadowAttribute))
self.assertEqual(attribute.value, second.attributes[0].value)
response = self.user_misp_connector.delete_attribute(second.attributes[1].id)
self.assertTrue(response['success'])
response = self.admin_misp_connector.delete_attribute(second.attributes[1].id)
self.assertEqual(response['message'], 'Attribute deleted.')
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second.id)
@unittest.skip("Currently failing")
def test_search_type_event_csv(self):
try:
first, second, third = self.environment()
# Search as admin
events = self.admin_misp_connector.search(return_format='csv', timestamp=first.timestamp.timestamp())
print(events)
events = self.admin_misp_connector.search(return_format='csv', timestamp=first.timestamp.timestamp(), pythonify=True)
self.assertTrue(isinstance(events, list))
self.assertEqual(len(events), 8)
attributes_types_search = self.admin_misp_connector.build_complex_query(or_parameters=['ip-src', 'ip-dst'])
events = self.admin_misp_connector.search(return_format='csv', timestamp=first.timestamp.timestamp(),
type_attribute=attributes_types_search)
print(events)
type_attribute=attributes_types_search, pythonify=True)
self.assertTrue(isinstance(events, list))
self.assertEqual(len(events), 6)
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second.id)
self.admin_misp_connector.delete_event(third.id)
def test_search_logs(self):
# FIXME: https://github.com/MISP/MISP/issues/4872
r = self.admin_misp_connector.search_logs(model='User', created=date.today(), pythonify=True)
for entry in r[-2:]:
self.assertEqual(entry.action, 'add')
def test_live_acl(self):
missing_acls = self.admin_misp_connector.get_live_query_acl()
missing_acls = self.admin_misp_connector.remote_acl
self.assertEqual(missing_acls, [], msg=missing_acls)
def test_roles(self):
role = self.admin_misp_connector.set_default_role(4)
self.assertEqual(role['message'], 'Default role set.')
self.admin_misp_connector.set_default_role(3)
roles = self.admin_misp_connector.roles(pythonify=True)
self.assertTrue(isinstance(roles, list))
def test_describe_types(self):
remote = self.admin_misp_connector.describe_types_remote
local = self.admin_misp_connector.describe_types_local
self.assertDictEqual(remote, local)
def test_versions(self):
self.assertEqual(self.user_misp_connector.version, self.user_misp_connector.pymisp_version_master)
self.assertEqual(self.user_misp_connector.misp_instance_version['version'],
self.user_misp_connector.misp_instance_version_master['version'])
def test_statistics(self):
try:
# Attributes
first, second, third = self.environment()
expected_attr_stats = {'ip-dst': '2', 'ip-src': '1', 'text': '5'}
attr_stats = self.admin_misp_connector.attributes_statistics()
self.assertDictEqual(attr_stats, expected_attr_stats)
expected_attr_stats_percent = {'ip-dst': '25%', 'ip-src': '12.5%', 'text': '62.5%'}
attr_stats = self.admin_misp_connector.attributes_statistics(percentage=True)
self.assertDictEqual(attr_stats, expected_attr_stats_percent)
expected_attr_stats_category_percent = {'Network activity': '37.5%', 'Other': '62.5%'}
attr_stats = self.admin_misp_connector.attributes_statistics(context='category', percentage=True)
self.assertDictEqual(attr_stats, expected_attr_stats_category_percent)
# Tags
to_test = {'tags': {'tlp:white___test': '1'}, 'taxonomies': {'workflow': 0}}
tags_stats = self.admin_misp_connector.tags_statistics()
self.assertDictEqual(tags_stats, to_test)
to_test = {'tags': {'tlp:white___test': '100%'}, 'taxonomies': {'workflow': '0%'}}
tags_stats = self.admin_misp_connector.tags_statistics(percentage=True, name_sort=True)
self.assertDictEqual(tags_stats, to_test)
# Users
to_test = {'stats': {'event_count': 3, 'event_count_month': 3, 'attribute_count': 8,
'attribute_count_month': 8, 'attributes_per_event': 3, 'correlation_count': 1,
'proposal_count': 0, 'user_count': 3, 'user_count_pgp': 0, 'org_count': 2,
'local_org_count': 2, 'average_user_per_org': 1.5, 'thread_count': 0,
'thread_count_month': 0, 'post_count': 0, 'post_count_month': 0}}
users_stats = self.admin_misp_connector.users_statistics(context='data')
self.assertDictEqual(users_stats, to_test)
users_stats = self.admin_misp_connector.users_statistics(context='orgs')
self.assertTrue('ORGNAME' in list(users_stats.keys()))
users_stats = self.admin_misp_connector.users_statistics(context='users')
self.assertEqual(list(users_stats.keys()), ['user', 'org_local', 'org_external'])
users_stats = self.admin_misp_connector.users_statistics(context='tags')
self.assertEqual(list(users_stats.keys()), ['flatData', 'treemap'])
# FIXME: https://github.com/MISP/MISP/issues/4880
# users_stats = self.admin_misp_connector.users_statistics(context='attributehistogram')
self.user_misp_connector.add_sighting({'value': first.attributes[0].value})
users_stats = self.user_misp_connector.users_statistics(context='sightings')
self.assertEqual(list(users_stats.keys()), ['toplist', 'eventids'])
users_stats = self.admin_misp_connector.users_statistics(context='galaxyMatrix')
self.assertTrue('matrix' in users_stats)
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second.id)
self.admin_misp_connector.delete_event(third.id)
def test_direct(self):
try:
r = self.user_misp_connector.direct_call('events/add', data={'info': 'foo'})
event = MISPEvent()
event.from_dict(**r)
r = self.user_misp_connector.direct_call(f'events/view/{event.id}')
event_get = MISPEvent()
event_get.from_dict(**r)
self.assertDictEqual(event.to_dict(), event_get.to_dict())
finally:
self.admin_misp_connector.delete_event(event.id)
def test_freetext(self):
first = self.create_simple_event()
try:
self.admin_misp_connector.toggle_warninglist(warninglist_name='%dns resolv%', force_enable=True)
first = self.user_misp_connector.add_event(first)
r = self.user_misp_connector.freetext(first.id, '1.1.1.1 foo@bar.de', adhereToWarninglists=False,
distribution=2, returnMetaAttributes=False, pythonify=True)
self.assertTrue(isinstance(r, list))
self.assertEqual(r[0].value, '1.1.1.1')
# FIXME: https://github.com/MISP/MISP/issues/4881
# r_wl = self.user_misp_connector.freetext(first.id, '8.8.8.8 foo@bar.de', adhereToWarninglists=True,
# distribution=2, returnMetaAttributes=False)
# print(r_wl)
r = self.user_misp_connector.freetext(first.id, '1.1.1.1 foo@bar.de', adhereToWarninglists=True,
distribution=2, returnMetaAttributes=True)
self.assertTrue(isinstance(r, list))
self.assertTrue(isinstance(r[0]['types'], dict))
# NOTE: required, or the attributes are inserted *after* the event is deleted
# FIXME: https://github.com/MISP/MISP/issues/4886
time.sleep(10)
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
def test_sharing_groups(self):
# add
sg = MISPSharingGroup()
sg.name = 'Testcases SG'
sg.releasability = 'Testing'
sharing_group = self.admin_misp_connector.add_sharing_group(sg, pythonify=True)
self.assertEqual(sharing_group.name, 'Testcases SG')
self.assertEqual(sharing_group.releasability, 'Testing')
# add org
# FIXME: https://github.com/MISP/MISP/issues/4884
# r = self.admin_misp_connector.add_org_to_sharing_group(sharing_group.id,
# organisation_id=self.test_org.id, extend=True)
# delete org
# FIXME: https://github.com/MISP/MISP/issues/4884
# r = self.admin_misp_connector.remove_org_from_sharing_group(sharing_group.id,
# organisation_id=self.test_org.id)
# Get list
sharing_groups = self.admin_misp_connector.sharing_groups(pythonify=True)
self.assertTrue(isinstance(sharing_groups, list))
self.assertEqual(sharing_groups[0].name, 'Testcases SG')
# Use the SG
first = self.create_simple_event()
try:
first = self.user_misp_connector.add_event(first)
first = self.admin_misp_connector.change_sharing_group_on_entity(first, sharing_group.id)
self.assertEqual(first.SharingGroup['name'], 'Testcases SG')
# FIXME https://github.com/MISP/MISP/issues/4891
# first_attribute = self.admin_misp_connector.change_sharing_group_on_entity(first.attributes[0], sharing_group.id)
# self.assertEqual(first_attribute.SharingGroup['name'], 'Testcases SG')
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
# delete
r = self.admin_misp_connector.delete_sharing_group(sharing_group.id)
self.assertEqual(r['message'], 'SharingGroup deleted')
def test_feeds(self):
# Add
feed = MISPFeed()
feed.name = 'TestFeed'
feed.provider = 'TestFeed - Provider'
feed.url = 'http://example.com'
feed = self.admin_misp_connector.add_feed(feed, pythonify=True)
self.assertEqual(feed.name, 'TestFeed')
self.assertEqual(feed.url, 'http://example.com')
# Update
feed.name = 'TestFeed - Update'
feed = self.admin_misp_connector.update_feed(feed, pythonify=True)
self.assertEqual(feed.name, 'TestFeed - Update')
# Delete
r = self.admin_misp_connector.delete_feed(feed.id)
self.assertEqual(r['message'], 'Feed deleted.')
# List
feeds = self.admin_misp_connector.feeds(pythonify=True)
self.assertTrue(isinstance(feeds, list))
for feed in feeds:
if feed.name == 'The Botvrij.eu Data':
break
# Get
botvrij = self.admin_misp_connector.get_feed(feed.id, pythonify=True)
self.assertEqual(botvrij.url, "http://www.botvrij.eu/data/feed-osint")
# Enable
# MISP OSINT
feed = self.admin_misp_connector.enable_feed(feeds[0].id, pythonify=True)
self.assertTrue(feed.enabled)
feed = self.admin_misp_connector.enable_feed_cache(feeds[0].id, pythonify=True)
self.assertTrue(feed.caching_enabled)
# Botvrij.eu
feed = self.admin_misp_connector.enable_feed(botvrij.id, pythonify=True)
self.assertTrue(feed.enabled)
feed = self.admin_misp_connector.enable_feed_cache(botvrij.id, pythonify=True)
self.assertTrue(feed.caching_enabled)
# Cache
r = self.admin_misp_connector.cache_feed(botvrij.id)
self.assertEqual(r['message'], 'Feed caching job initiated.')
# Fetch
# Cannot test that, it fetches all the events.
# r = self.admin_misp_connector.fetch_feed(botvrij.id)
# FIXME https://github.com/MISP/MISP/issues/4834#issuecomment-511889274
# self.assertEqual(r['message'], 'Feed caching job initiated.')
# Cache all enabled feeds
r = self.admin_misp_connector.cache_all_feeds()
self.assertEqual(r['message'], 'Feed caching job initiated.')
# Compare all enabled feeds
r = self.admin_misp_connector.compare_feeds()
# FIXME: https://github.com/MISP/MISP/issues/4834#issuecomment-511890466
# self.assertEqual(r['message'], 'Feed caching job initiated.')
time.sleep(30)
# Disable both feeds
feed = self.admin_misp_connector.disable_feed(feeds[0].id, pythonify=True)
self.assertFalse(feed.enabled)
feed = self.admin_misp_connector.disable_feed(botvrij.id, pythonify=True)
self.assertFalse(feed.enabled)
feed = self.admin_misp_connector.disable_feed_cache(feeds[0].id, pythonify=True)
self.assertFalse(feed.enabled)
feed = self.admin_misp_connector.disable_feed_cache(botvrij.id, pythonify=True)
self.assertFalse(feed.enabled)
def test_servers(self):
# add
server = MISPServer()
server.name = 'Test Server'
server.url = 'https://127.0.0.1'
server.remote_org_id = 1
server.authkey = key
server = self.admin_misp_connector.add_server(server, pythonify=True)
self.assertEqual(server.name, 'Test Server')
# Update
server.name = 'Updated name'
server = self.admin_misp_connector.update_server(server, pythonify=True)
self.assertEqual(server.name, 'Updated name')
# List
servers = self.admin_misp_connector.servers(pythonify=True)
self.assertEqual(servers[0].name, 'Updated name')
# Delete
server = self.admin_misp_connector.delete_server(server.id)
# FIXME: https://github.com/MISP/MISP/issues/4889
def test_upload_stix(self):
# FIXME https://github.com/MISP/MISP/issues/4892
pass
if __name__ == '__main__':