mirror of https://github.com/MISP/PyMISP
new: Remove python < 3.6 support.
parent
fac748dd4c
commit
24a8f90ea8
|
@ -12,12 +12,6 @@ addons:
|
|||
|
||||
matrix:
|
||||
include:
|
||||
- name: "Python 2.7 - legacy"
|
||||
python: 2.7
|
||||
env: LEGACY=true
|
||||
- name: "Python 3.5"
|
||||
python: 3.5
|
||||
dist: xenial
|
||||
- name: "Python 3.6"
|
||||
python: 3.6
|
||||
dist: xenial
|
||||
|
|
|
@ -68,11 +68,11 @@
|
|||
},
|
||||
"importlib-metadata": {
|
||||
"hashes": [
|
||||
"sha256:b044f07694ef14a6683b097ba56bd081dbc7cdc7c7fe46011e499dfecc082f21",
|
||||
"sha256:e6ac600a142cf2db707b1998382cc7fc3b02befb7273876e01b8ad10b9652742"
|
||||
"sha256:073a852570f92da5f744a3472af1b61e28e9f78ccf0c9117658dc32b15de7b45",
|
||||
"sha256:d95141fbfa7ef2ec65cfd945e2af7e5a6ddbd7c8d9a25e66ff3be8e3daf9f60f"
|
||||
],
|
||||
"markers": "python_version < '3.8'",
|
||||
"version": "==1.1.0"
|
||||
"version": "==1.3.0"
|
||||
},
|
||||
"jsonschema": {
|
||||
"hashes": [
|
||||
|
@ -102,10 +102,10 @@
|
|||
},
|
||||
"more-itertools": {
|
||||
"hashes": [
|
||||
"sha256:53ff73f186307d9c8ef17a9600309154a6ae27f25579e80af4db8f047ba14bc2",
|
||||
"sha256:a0ea684c39bc4315ba7aae406596ef191fd84f873d2d2751f84d64e81a7a2d45"
|
||||
"sha256:b84b238cce0d9adad5ed87e745778d20a3f8487d0f0cb8b8a586816c7496458d",
|
||||
"sha256:c833ef592a0324bcc6a60e48440da07645063c453880c9477ceb22490aec1564"
|
||||
],
|
||||
"version": "==8.0.0"
|
||||
"version": "==8.0.2"
|
||||
},
|
||||
"pillow": {
|
||||
"hashes": [
|
||||
|
@ -246,9 +246,9 @@
|
|||
},
|
||||
"validators": {
|
||||
"hashes": [
|
||||
"sha256:f0ac832212e3ee2e9b10e156f19b106888cf1429c291fbc5297aae87685014ae"
|
||||
"sha256:0bfe836a1af37bb266d71ec1e98b530c38ce11bc7fbe0c4c96ef7b1532d019e5"
|
||||
],
|
||||
"version": "==0.14.0"
|
||||
"version": "==0.14.1"
|
||||
},
|
||||
"wrapt": {
|
||||
"hashes": [
|
||||
|
@ -325,10 +325,10 @@
|
|||
},
|
||||
"colorama": {
|
||||
"hashes": [
|
||||
"sha256:05eed71e2e327246ad6b38c540c4a3117230b19679b875190486ddd2d721422d",
|
||||
"sha256:f8ac84de7840f5b9c4e3347b3c1eaa50f7e49c2b07596221daec5edaabbd7c48"
|
||||
"sha256:7d73d2a99753107a36ac6b455ee49046802e59d9d076ef8e47b61499fa29afff",
|
||||
"sha256:e96da0d330793e2cb9485e9ddfd918d456036c7149416295932478192f4436a1"
|
||||
],
|
||||
"version": "==0.4.1"
|
||||
"version": "==0.4.3"
|
||||
},
|
||||
"commonmark": {
|
||||
"hashes": [
|
||||
|
@ -376,11 +376,11 @@
|
|||
},
|
||||
"coveralls": {
|
||||
"hashes": [
|
||||
"sha256:9bc5a1f92682eef59f688a8f280207190d9a6afb84cef8f567fa47631a784060",
|
||||
"sha256:fb51cddef4bc458de347274116df15d641a735d3f0a580a9472174e2e62f408c"
|
||||
"sha256:25522a50cdf720d956601ca6ef480786e655ae2f0c94270c77e1a23d742de558",
|
||||
"sha256:8e3315e8620bb6b3c6f3179a75f498e7179c93b3ddc440352404f941b1f70524"
|
||||
],
|
||||
"index": "pypi",
|
||||
"version": "==1.8.2"
|
||||
"version": "==1.9.2"
|
||||
},
|
||||
"decorator": {
|
||||
"hashes": [
|
||||
|
@ -426,11 +426,11 @@
|
|||
},
|
||||
"importlib-metadata": {
|
||||
"hashes": [
|
||||
"sha256:b044f07694ef14a6683b097ba56bd081dbc7cdc7c7fe46011e499dfecc082f21",
|
||||
"sha256:e6ac600a142cf2db707b1998382cc7fc3b02befb7273876e01b8ad10b9652742"
|
||||
"sha256:073a852570f92da5f744a3472af1b61e28e9f78ccf0c9117658dc32b15de7b45",
|
||||
"sha256:d95141fbfa7ef2ec65cfd945e2af7e5a6ddbd7c8d9a25e66ff3be8e3daf9f60f"
|
||||
],
|
||||
"markers": "python_version < '3.8'",
|
||||
"version": "==1.1.0"
|
||||
"version": "==1.3.0"
|
||||
},
|
||||
"jinja2": {
|
||||
"hashes": [
|
||||
|
@ -507,10 +507,10 @@
|
|||
},
|
||||
"more-itertools": {
|
||||
"hashes": [
|
||||
"sha256:53ff73f186307d9c8ef17a9600309154a6ae27f25579e80af4db8f047ba14bc2",
|
||||
"sha256:a0ea684c39bc4315ba7aae406596ef191fd84f873d2d2751f84d64e81a7a2d45"
|
||||
"sha256:b84b238cce0d9adad5ed87e745778d20a3f8487d0f0cb8b8a586816c7496458d",
|
||||
"sha256:c833ef592a0324bcc6a60e48440da07645063c453880c9477ceb22490aec1564"
|
||||
],
|
||||
"version": "==8.0.0"
|
||||
"version": "==8.0.2"
|
||||
},
|
||||
"neobolt": {
|
||||
"hashes": [
|
||||
|
@ -740,10 +740,10 @@
|
|||
},
|
||||
"sphinx": {
|
||||
"hashes": [
|
||||
"sha256:31088dfb95359384b1005619827eaee3056243798c62724fd3fa4b84ee4d71bd",
|
||||
"sha256:52286a0b9d7caa31efee301ec4300dbdab23c3b05da1c9024b4e84896fb73d79"
|
||||
"sha256:0a11e2fd31fe5c7e64b4fc53c2c022946512f021d603eb41ac6ae51d5fcbb574",
|
||||
"sha256:138e39aa10f28d52aa5759fc6d1cba2be6a4b750010974047fa7d0e31addcf63"
|
||||
],
|
||||
"version": "==2.2.1"
|
||||
"version": "==2.3.0"
|
||||
},
|
||||
"sphinx-autodoc-typehints": {
|
||||
"hashes": [
|
||||
|
@ -803,9 +803,9 @@
|
|||
},
|
||||
"validators": {
|
||||
"hashes": [
|
||||
"sha256:f0ac832212e3ee2e9b10e156f19b106888cf1429c291fbc5297aae87685014ae"
|
||||
"sha256:0bfe836a1af37bb266d71ec1e98b530c38ce11bc7fbe0c4c96ef7b1532d019e5"
|
||||
],
|
||||
"version": "==0.14.0"
|
||||
"version": "==0.14.1"
|
||||
},
|
||||
"wcwidth": {
|
||||
"hashes": [
|
||||
|
|
|
@ -13,24 +13,18 @@ logger.addHandler(default_handler)
|
|||
logger.setLevel(logging.WARNING)
|
||||
|
||||
|
||||
def warning_2020():
|
||||
|
||||
if sys.version_info < (3, 6):
|
||||
warnings.warn("""
|
||||
Python 2.7 is officially end of life the 2020-01-01. For this occasion,
|
||||
we decided to review which versions of Python we support and our conclusion
|
||||
is to only support python 3.6+ starting the 2020-01-01.
|
||||
|
||||
Every version of pymisp released after the 2020-01-01 will fail if the
|
||||
python interpreter is prior to python 3.6.
|
||||
|
||||
**Please update your codebase.**""", DeprecationWarning, stacklevel=3)
|
||||
everything_broken = '''Unknown error: the response is not in JSON.
|
||||
Something is broken server-side, please send us everything that follows (careful with the auth key):
|
||||
Request headers:
|
||||
{}
|
||||
Request body:
|
||||
{}
|
||||
Response (if any):
|
||||
{}'''
|
||||
|
||||
|
||||
try:
|
||||
warning_2020()
|
||||
from .exceptions import PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey, InvalidMISPObject, UnknownMISPObjectTemplate, PyMISPInvalidFormat, MISPServerError, PyMISPNotImplementedYet, PyMISPUnexpectedResponse, PyMISPEmptyResponse # noqa
|
||||
from .api import PyMISP # noqa
|
||||
from .abstract import AbstractMISP, MISPEncode, pymisp_json_default, MISPTag, Distribution, ThreatLevel, Analysis # noqa
|
||||
from .mispevent import MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, MISPOrganisation, MISPSighting, MISPLog, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy, MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPUserSetting # noqa
|
||||
from .tools import AbstractMISPObjectGenerator # noqa
|
||||
|
@ -39,8 +33,8 @@ try:
|
|||
from .tools import openioc # noqa
|
||||
from .tools import ext_lookups # noqa
|
||||
|
||||
if sys.version_info >= (3, 6):
|
||||
from .aping import ExpandedPyMISP # noqa
|
||||
from .api import PyMISP # noqa
|
||||
from .api import PyMISP as ExpandedPyMISP # noqa
|
||||
from .tools import load_warninglists # noqa
|
||||
# Let's not bother with old python
|
||||
try:
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import sys
|
||||
import datetime
|
||||
|
||||
from deprecated import deprecated
|
||||
|
@ -27,78 +26,19 @@ from enum import Enum
|
|||
from .exceptions import PyMISPInvalidFormat, PyMISPError
|
||||
|
||||
|
||||
from collections.abc import MutableMapping
|
||||
from functools import lru_cache
|
||||
from pathlib import Path
|
||||
|
||||
logger = logging.getLogger('pymisp')
|
||||
|
||||
if sys.version_info < (3, 0):
|
||||
from collections import MutableMapping
|
||||
import os
|
||||
from cachetools import cached, LRUCache
|
||||
|
||||
resources_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'data')
|
||||
misp_objects_path = os.path.join(resources_path, 'misp-objects', 'objects')
|
||||
with open(os.path.join(resources_path, 'describeTypes.json'), 'r') as f:
|
||||
resources_path = Path(__file__).parent / 'data'
|
||||
misp_objects_path = resources_path / 'misp-objects' / 'objects'
|
||||
with (resources_path / 'describeTypes.json').open('r') as f:
|
||||
describe_types = load(f)['result']
|
||||
|
||||
# This is required because Python 2 is a pain.
|
||||
from datetime import tzinfo, timedelta
|
||||
|
||||
class UTC(tzinfo):
|
||||
"""UTC"""
|
||||
|
||||
def utcoffset(self, dt):
|
||||
return timedelta(0)
|
||||
|
||||
def tzname(self, dt):
|
||||
return "UTC"
|
||||
|
||||
def dst(self, dt):
|
||||
return timedelta(0)
|
||||
|
||||
class MISPFileCache(object):
|
||||
# cache up to 150 JSON structures in class attribute
|
||||
|
||||
@staticmethod
|
||||
@cached(cache=LRUCache(maxsize=150))
|
||||
def _load_json(path):
|
||||
if not os.path.exists(path):
|
||||
return None
|
||||
with open(path, 'r') as f:
|
||||
data = load(f)
|
||||
return data
|
||||
|
||||
elif sys.version_info < (3, 4):
|
||||
from collections.abc import MutableMapping
|
||||
from functools import lru_cache
|
||||
import os
|
||||
|
||||
resources_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'data')
|
||||
misp_objects_path = os.path.join(resources_path, 'misp-objects', 'objects')
|
||||
with open(os.path.join(resources_path, 'describeTypes.json'), 'r') as f:
|
||||
describe_types = load(f)['result']
|
||||
|
||||
class MISPFileCache(object):
|
||||
# cache up to 150 JSON structures in class attribute
|
||||
|
||||
@staticmethod
|
||||
@lru_cache(maxsize=150)
|
||||
def _load_json(path):
|
||||
if not os.path.exists(path):
|
||||
return None
|
||||
with open(path, 'r') as f:
|
||||
data = load(f)
|
||||
return data
|
||||
|
||||
else:
|
||||
from collections.abc import MutableMapping
|
||||
from functools import lru_cache
|
||||
from pathlib import Path
|
||||
|
||||
resources_path = Path(__file__).parent / 'data'
|
||||
misp_objects_path = resources_path / 'misp-objects' / 'objects'
|
||||
with (resources_path / 'describeTypes.json').open('r') as f:
|
||||
describe_types = load(f)['result']
|
||||
|
||||
class MISPFileCache(object):
|
||||
class MISPFileCache(object):
|
||||
# cache up to 150 JSON structures in class attribute
|
||||
|
||||
@staticmethod
|
||||
|
@ -191,7 +131,7 @@ class AbstractMISP(MutableMapping, MISPFileCache):
|
|||
To do so, you need to call the respective add_* or update_*
|
||||
methods in ExpandedPyMISP/PyMISP.
|
||||
"""
|
||||
super(AbstractMISP, self).__init__()
|
||||
super().__init__()
|
||||
self.__edited = True # As we create a new object, we assume it is edited
|
||||
self.__not_jsonable = []
|
||||
self.__self_defined_describe_types = None
|
||||
|
@ -230,7 +170,7 @@ class AbstractMISP(MutableMapping, MISPFileCache):
|
|||
|
||||
@misp_objects_path.setter
|
||||
def misp_objects_path(self, misp_objects_path):
|
||||
if sys.version_info >= (3, 0) and isinstance(misp_objects_path, str):
|
||||
if isinstance(misp_objects_path, str):
|
||||
misp_objects_path = Path(misp_objects_path)
|
||||
self.__misp_objects_path = misp_objects_path
|
||||
|
||||
|
@ -362,17 +302,14 @@ class AbstractMISP(MutableMapping, MISPFileCache):
|
|||
# The private members don't matter
|
||||
# If we already have a key with that name, we're modifying it.
|
||||
self.__edited = True
|
||||
super(AbstractMISP, self).__setattr__(name, value)
|
||||
super().__setattr__(name, value)
|
||||
|
||||
def _datetime_to_timestamp(self, d):
|
||||
"""Convert a datetime.datetime object to a timestamp (int)"""
|
||||
if isinstance(d, (int, float, str)) or (sys.version_info < (3, 0) and isinstance(d, unicode)):
|
||||
if isinstance(d, (int, float, str)):
|
||||
# Assume we already have a timestamp
|
||||
return int(d)
|
||||
if sys.version_info >= (3, 3):
|
||||
return int(d.timestamp())
|
||||
else:
|
||||
return int((d - datetime.datetime.fromtimestamp(0, UTC())).total_seconds())
|
||||
|
||||
def __add_tag(self, tag=None, **kwargs):
|
||||
"""Add a tag to the attribute (by name or a MISPTag object)"""
|
||||
|
@ -422,13 +359,10 @@ class MISPTag(AbstractMISP):
|
|||
|
||||
_fields_for_feed = {'name', 'colour'}
|
||||
|
||||
def __init__(self):
|
||||
super(MISPTag, self).__init__()
|
||||
|
||||
def from_dict(self, **kwargs):
|
||||
if kwargs.get('Tag'):
|
||||
kwargs = kwargs.get('Tag')
|
||||
super(MISPTag, self).from_dict(**kwargs)
|
||||
super().from_dict(**kwargs)
|
||||
|
||||
def _set_default(self):
|
||||
if not hasattr(self, 'colour'):
|
||||
|
@ -437,4 +371,4 @@ class MISPTag(AbstractMISP):
|
|||
def _to_feed(self):
|
||||
if hasattr(self, 'exportable') and not self.exportable:
|
||||
return False
|
||||
return super(MISPTag, self)._to_feed()
|
||||
return super()._to_feed()
|
||||
|
|
4329
pymisp/api.py
4329
pymisp/api.py
File diff suppressed because it is too large
Load Diff
2243
pymisp/aping.py
2243
pymisp/aping.py
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
|
@ -1,5 +1,3 @@
|
|||
import sys
|
||||
|
||||
from .vtreportobject import VTReportObject # noqa
|
||||
from .neo4j import Neo4j # noqa
|
||||
from .fileobject import FileObject # noqa
|
||||
|
@ -16,14 +14,13 @@ from .domainipobject import DomainIPObject # noqa
|
|||
from .asnobject import ASNObject # noqa
|
||||
from .geolocationobject import GeolocationObject # noqa
|
||||
|
||||
if sys.version_info >= (3, 6):
|
||||
from .emailobject import EMailObject # noqa
|
||||
from .vehicleobject import VehicleObject # noqa
|
||||
from .csvloader import CSVLoader # noqa
|
||||
from .sshauthkeyobject import SSHAuthorizedKeysObject # noqa
|
||||
from .feed import feed_meta_generator # noqa
|
||||
try:
|
||||
from .emailobject import EMailObject # noqa
|
||||
from .vehicleobject import VehicleObject # noqa
|
||||
from .csvloader import CSVLoader # noqa
|
||||
from .sshauthkeyobject import SSHAuthorizedKeysObject # noqa
|
||||
from .feed import feed_meta_generator # noqa
|
||||
try:
|
||||
from .urlobject import URLObject # noqa
|
||||
except ImportError:
|
||||
except ImportError:
|
||||
# Requires faup, which is a bit difficult to install
|
||||
pass
|
||||
|
|
9
setup.py
9
setup.py
|
@ -34,15 +34,12 @@ setup(
|
|||
'Intended Audience :: Science/Research',
|
||||
'Intended Audience :: Telecommunications Industry',
|
||||
'Intended Audience :: Information Technology',
|
||||
'Programming Language :: Python :: 2.7',
|
||||
'Programming Language :: Python :: 3',
|
||||
'Programming Language :: Python :: 3.6',
|
||||
'Topic :: Security',
|
||||
'Topic :: Internet',
|
||||
],
|
||||
install_requires=['six', 'requests', 'python-dateutil', 'jsonschema',
|
||||
'python-dateutil', 'enum34;python_version<"3.4"',
|
||||
'functools32;python_version<"3.0"', 'deprecated', 'cachetools;python_version<"3.0"'],
|
||||
extras_require={'fileobjects': ['lief>=0.8,<0.10;python_version<"3.5"', 'lief>=0.10.1;python_version>"3.5"', 'python-magic', 'pydeep'],
|
||||
install_requires=['six', 'requests', 'python-dateutil', 'jsonschema', 'deprecated'],
|
||||
extras_require={'fileobjects': ['lief>=0.10.1', 'python-magic', 'pydeep'],
|
||||
'neo': ['py2neo'],
|
||||
'openioc': ['beautifulsoup4'],
|
||||
'virustotal': ['validators'],
|
||||
|
|
314
tests/test.py
314
tests/test.py
|
@ -1,314 +0,0 @@
|
|||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
from pymisp import PyMISP, __version__
|
||||
try:
|
||||
from keys import url, key
|
||||
except ImportError as e:
|
||||
print(e)
|
||||
url = 'https://localhost:8443'
|
||||
key = 'd6OmdDFvU3Seau3UjwvHS1y3tFQbaRNhJhDX0tjh'
|
||||
|
||||
import time
|
||||
|
||||
import unittest
|
||||
|
||||
|
||||
class TestBasic(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.maxDiff = None
|
||||
self.misp = PyMISP(url, key, False, 'json')
|
||||
self.live_describe_types = self.misp.get_live_describe_types()
|
||||
|
||||
def _clean_event(self, event):
|
||||
event['Event'].pop('orgc_id', None)
|
||||
event['Event'].pop('uuid', None)
|
||||
event['Event'].pop('sharing_group_id', None)
|
||||
event['Event'].pop('timestamp', None)
|
||||
event['Event'].pop('org_id', None)
|
||||
event['Event'].pop('date', None)
|
||||
event['Event'].pop('RelatedEvent', None)
|
||||
event['Event'].pop('publish_timestamp', None)
|
||||
if event['Event'].get('Attribute'):
|
||||
for a in event['Event'].get('Attribute'):
|
||||
a.pop('uuid', None)
|
||||
a.pop('event_id', None)
|
||||
a.pop('id', None)
|
||||
a.pop('timestamp', None)
|
||||
if event['Event'].get('Orgc'):
|
||||
event['Event']['Orgc'].pop('uuid', None)
|
||||
event['Event']['Orgc'].pop('id', None)
|
||||
if event['Event'].get('Org'):
|
||||
event['Event']['Org'].pop('uuid', None)
|
||||
event['Event']['Org'].pop('id', None)
|
||||
return event['Event'].pop('id', None)
|
||||
|
||||
def new_event(self):
|
||||
event = self.misp.new_event(0, 1, 0, "This is a test")
|
||||
event_id = self._clean_event(event)
|
||||
to_check = {u'Event': {u'info': u'This is a test', u'locked': False,
|
||||
u'attribute_count': u'0', 'disable_correlation': False, u'analysis': u'0',
|
||||
u'ShadowAttribute': [], u'published': False,
|
||||
u'distribution': u'0', u'event_creator_email': u'admin@admin.test', u'Attribute': [], u'proposal_email_lock': False,
|
||||
u'extends_uuid': '',
|
||||
u'Object': [], u'Org': {'local': True, u'name': u'ORGNAME'},
|
||||
u'Orgc': {'local': True, u'name': u'ORGNAME'},
|
||||
u'Galaxy': [],
|
||||
u'threat_level_id': u'1'}}
|
||||
self.assertEqual(event, to_check, 'Failed at creating a new Event')
|
||||
return int(event_id)
|
||||
|
||||
def add_hashes(self, eventid):
|
||||
r = self.misp.get_event(eventid)
|
||||
event = r.json()
|
||||
event = self.misp.add_hashes(event,
|
||||
category='Payload installation',
|
||||
filename='dll_installer.dll',
|
||||
md5='0a209ac0de4ac033f31d6ba9191a8f7a',
|
||||
sha1='1f0ae54ac3f10d533013f74f48849de4e65817a7',
|
||||
sha256='003315b0aea2fcb9f77d29223dd8947d0e6792b3a0227e054be8eb2a11f443d9',
|
||||
ssdeep=None,
|
||||
comment='Fanny modules',
|
||||
to_ids=False,
|
||||
distribution=2,
|
||||
proposal=False)
|
||||
self._clean_event(event)
|
||||
to_check = {u'Event': {u'info': u'This is a test', u'locked': False,
|
||||
u'attribute_count': u'3', u'analysis': u'0',
|
||||
u'ShadowAttribute': [], u'published': False, u'distribution': u'0', u'event_creator_email': u'admin@admin.test',
|
||||
u'Org': {'local': True, u'name': u'ORGNAME'},
|
||||
u'Orgc': {'local': True, u'name': u'ORGNAME'},
|
||||
u'Galaxy': [],
|
||||
u'Attribute': [
|
||||
{u'category': u'Payload installation', u'comment': u'Fanny modules',
|
||||
u'to_ids': False, u'value': u'dll_installer.dll|0a209ac0de4ac033f31d6ba9191a8f7a',
|
||||
u'ShadowAttribute': [], u'distribution': u'2', u'type': u'filename|md5'},
|
||||
{u'category': u'Payload installation', u'comment': u'Fanny modules',
|
||||
u'to_ids': False, u'value': u'dll_installer.dll|1f0ae54ac3f10d533013f74f48849de4e65817a7',
|
||||
u'ShadowAttribute': [], u'distribution': u'2', u'type': u'filename|sha1'},
|
||||
{u'category': u'Payload installation', u'comment': u'Fanny modules',
|
||||
u'to_ids': False, u'value': u'dll_installer.dll|003315b0aea2fcb9f77d29223dd8947d0e6792b3a0227e054be8eb2a11f443d9',
|
||||
u'ShadowAttribute': [], u'distribution': u'2', u'type': u'filename|sha256'}],
|
||||
u'proposal_email_lock': False, u'threat_level_id': u'1'}}
|
||||
self.assertEqual(event, to_check, 'Failed at adding hashes')
|
||||
|
||||
def publish(self, eventid):
|
||||
r = self.misp.get_event(eventid)
|
||||
event = r.json()
|
||||
event = self.misp.publish(event)
|
||||
self._clean_event(event)
|
||||
to_check = {u'Event': {u'info': u'This is a test', u'locked': False,
|
||||
u'attribute_count': u'3', u'analysis': u'0',
|
||||
u'ShadowAttribute': [], u'published': True, u'distribution': u'0', u'event_creator_email': u'admin@admin.test',
|
||||
u'Org': {'local': True, u'name': u'ORGNAME'},
|
||||
u'Orgc': {'local': True, u'name': u'ORGNAME'},
|
||||
u'Galaxy': [],
|
||||
u'Attribute': [
|
||||
{u'category': u'Payload installation', u'comment': u'Fanny modules',
|
||||
u'to_ids': False, u'value': u'dll_installer.dll|0a209ac0de4ac033f31d6ba9191a8f7a',
|
||||
u'ShadowAttribute': [], u'distribution': u'2', u'type': u'filename|md5'},
|
||||
{u'category': u'Payload installation', u'comment': u'Fanny modules',
|
||||
u'to_ids': False, u'value': u'dll_installer.dll|1f0ae54ac3f10d533013f74f48849de4e65817a7',
|
||||
u'ShadowAttribute': [], u'distribution': u'2', u'type': u'filename|sha1'},
|
||||
{u'category': u'Payload installation', u'comment': u'Fanny modules',
|
||||
u'to_ids': False, u'value': u'dll_installer.dll|003315b0aea2fcb9f77d29223dd8947d0e6792b3a0227e054be8eb2a11f443d9',
|
||||
u'ShadowAttribute': [], u'distribution': u'2', u'type': u'filename|sha256'}],
|
||||
u'proposal_email_lock': False, u'threat_level_id': u'1'}}
|
||||
self.assertEqual(event, to_check, 'Failed at publishing event')
|
||||
|
||||
def delete(self, eventid):
|
||||
event = self.misp.delete_event(eventid)
|
||||
print(event)
|
||||
|
||||
def delete_attr(self, attrid):
|
||||
event = self.misp.delete_attribute(attrid)
|
||||
print(event)
|
||||
|
||||
def get(self, eventid):
|
||||
event = self.misp.get_event(eventid)
|
||||
print(event)
|
||||
|
||||
def get_stix(self, **kwargs):
|
||||
event = self.misp.get_stix(kwargs)
|
||||
print(event)
|
||||
|
||||
def add(self):
|
||||
event = {u'Event': {u'info': u'This is a test', u'locked': False,
|
||||
u'attribute_count': u'3', u'analysis': u'0',
|
||||
u'ShadowAttribute': [], u'published': False, u'distribution': u'0', u'event_creator_email': u'admin@admin.test',
|
||||
u'Attribute': [
|
||||
{u'category': u'Payload installation', u'comment': u'Fanny modules',
|
||||
u'to_ids': False, u'value': u'dll_installer.dll|0a209ac0de4ac033f31d6ba9191a8f7a',
|
||||
u'ShadowAttribute': [], u'distribution': u'2', u'type': u'filename|md5'},
|
||||
{u'category': u'Payload installation', u'comment': u'Fanny modules',
|
||||
u'to_ids': False, u'value': u'dll_installer.dll|1f0ae54ac3f10d533013f74f48849de4e65817a7',
|
||||
u'ShadowAttribute': [], u'distribution': u'2', u'type': u'filename|sha1'},
|
||||
{u'category': u'Payload installation', u'comment': u'Fanny modules',
|
||||
u'to_ids': False, u'value': u'dll_installer.dll|003315b0aea2fcb9f77d29223dd8947d0e6792b3a0227e054be8eb2a11f443d9',
|
||||
u'ShadowAttribute': [], u'distribution': u'2', u'type': u'filename|sha256'}],
|
||||
u'proposal_email_lock': False, u'threat_level_id': u'1'}}
|
||||
event = self.misp.add_event(event)
|
||||
print(event)
|
||||
|
||||
def add_user(self):
|
||||
email = 'test@misp.local'
|
||||
role_id = '5'
|
||||
org_id = '1'
|
||||
password = 'Password1234!'
|
||||
external_auth_required = False
|
||||
external_auth_key = ''
|
||||
enable_password = False
|
||||
nids_sid = '1238717'
|
||||
server_id = '1'
|
||||
gpgkey = ''
|
||||
certif_public = ''
|
||||
autoalert = False
|
||||
contactalert = False
|
||||
disabled = False
|
||||
change_pw = '0'
|
||||
termsaccepted = False
|
||||
newsread = '0'
|
||||
authkey = 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
|
||||
to_check = {'User': {'email': email, 'org_id': org_id, 'role_id': role_id,
|
||||
'password': password, 'external_auth_required': external_auth_required,
|
||||
'external_auth_key': external_auth_key, 'enable_password': enable_password,
|
||||
'nids_sid': nids_sid, 'server_id': server_id, 'gpgkey': gpgkey,
|
||||
'certif_public': certif_public, 'autoalert': autoalert,
|
||||
'contactalert': contactalert, 'disabled': disabled,
|
||||
'change_pw': change_pw, 'termsaccepted': termsaccepted,
|
||||
'newsread': newsread, 'authkey': authkey}}
|
||||
user = self.misp.add_user(email=email,
|
||||
role_id=role_id,
|
||||
org_id=org_id,
|
||||
password=password,
|
||||
external_auth_required=external_auth_required,
|
||||
external_auth_key=external_auth_key,
|
||||
enable_password=enable_password,
|
||||
nids_sid=nids_sid,
|
||||
server_id=server_id,
|
||||
gpgkey=gpgkey,
|
||||
certif_public=certif_public,
|
||||
autoalert=autoalert,
|
||||
contactalert=contactalert,
|
||||
disabled=disabled,
|
||||
change_pw=change_pw,
|
||||
termsaccepted=termsaccepted,
|
||||
newsread=newsread,
|
||||
authkey=authkey)
|
||||
# delete user to allow reuse of test
|
||||
uid = user.get('User').get('id')
|
||||
self.misp.delete_user(uid)
|
||||
# ----------------------------------
|
||||
# test interesting keys only (some keys are modified(password) and some keys are added (lastlogin)
|
||||
tested_keys = ['email', 'org_id', 'role_id', 'server_id', 'autoalert',
|
||||
'authkey', 'gpgkey', 'certif_public', 'nids_sid', 'termsaccepted',
|
||||
'newsread', 'contactalert', 'disabled']
|
||||
for k in tested_keys:
|
||||
self.assertEqual(user.get('User').get(k), to_check.get('User').get(k), "Failed to match input with output on key: {}".format(k))
|
||||
|
||||
def add_organisation(self):
|
||||
name = 'Organisation tests'
|
||||
description = 'This is a test organisation'
|
||||
orgtype = 'Type is a string'
|
||||
nationality = 'French'
|
||||
sector = 'Bank sector'
|
||||
uuid = '16fd2706-8baf-433b-82eb-8c7fada847da'
|
||||
contacts = 'Text field with no limitations'
|
||||
local = False
|
||||
to_check = {'Organisation': {'name': name, 'description': description,
|
||||
'type': orgtype, 'nationality': nationality,
|
||||
'sector': sector, 'uuid': uuid, 'contacts': contacts,
|
||||
'local': local}}
|
||||
org = self.misp.add_organisation(name=name,
|
||||
description=description,
|
||||
type=orgtype,
|
||||
nationality=nationality,
|
||||
sector=sector,
|
||||
uuid=uuid,
|
||||
contacts=contacts,
|
||||
local=local,
|
||||
)
|
||||
# delete organisation to allow reuse of test
|
||||
oid = org.get('Organisation').get('id')
|
||||
self.misp.delete_organisation(oid)
|
||||
# ----------------------------------
|
||||
tested_keys = ['anonymise', 'contacts', 'description', 'local', 'name',
|
||||
'nationality', 'sector', 'type', 'uuid']
|
||||
for k in tested_keys:
|
||||
self.assertEqual(org.get('Organisation').get(k), to_check.get('Organisation').get(k), "Failed to match input with output on key: {}".format(k))
|
||||
|
||||
def test_create_event(self):
|
||||
eventid = self.new_event()
|
||||
time.sleep(1)
|
||||
self.delete(eventid)
|
||||
|
||||
def test_get_event(self):
|
||||
eventid = self.new_event()
|
||||
time.sleep(1)
|
||||
self.get(eventid)
|
||||
time.sleep(1)
|
||||
self.delete(eventid)
|
||||
|
||||
def test_add_event(self):
|
||||
self.add()
|
||||
time.sleep(1)
|
||||
self.delete(1)
|
||||
|
||||
def test_del_attr(self):
|
||||
eventid = self.new_event()
|
||||
time.sleep(1)
|
||||
self.delete_attr(1)
|
||||
time.sleep(1)
|
||||
self.delete(eventid)
|
||||
|
||||
def test_one_or_more(self):
|
||||
self.assertEqual(self.misp._one_or_more(1), (1,))
|
||||
self.assertEqual(self.misp._one_or_more([1]), [1])
|
||||
|
||||
def test_create_user(self):
|
||||
self.add_user()
|
||||
|
||||
def test_create_organisation(self):
|
||||
self.add_organisation()
|
||||
|
||||
def test_describeTypes_sane_default(self):
|
||||
sane_default = self.live_describe_types['sane_defaults']
|
||||
self.assertEqual(sorted(sane_default.keys()), sorted(self.live_describe_types['types']))
|
||||
|
||||
def test_describeTypes_categories(self):
|
||||
category_type_mappings = self.live_describe_types['category_type_mappings']
|
||||
self.assertEqual(sorted(category_type_mappings.keys()), sorted(self.live_describe_types['categories']))
|
||||
|
||||
def test_describeTypes_types_in_categories(self):
|
||||
category_type_mappings = self.live_describe_types['category_type_mappings']
|
||||
for category, types in category_type_mappings.items():
|
||||
existing_types = [t for t in types if t in self.live_describe_types['types']]
|
||||
self.assertEqual(sorted(existing_types), sorted(types))
|
||||
|
||||
def test_describeTypes_types_have_category(self):
|
||||
category_type_mappings = self.live_describe_types['category_type_mappings']
|
||||
all_types = set()
|
||||
for category, types in category_type_mappings.items():
|
||||
all_types.update(types)
|
||||
self.assertEqual(sorted(list(all_types)), sorted(self.live_describe_types['types']))
|
||||
|
||||
def test_describeTypes_sane_default_valid_category(self):
|
||||
sane_default = self.live_describe_types['sane_defaults']
|
||||
categories = self.live_describe_types['categories']
|
||||
for t, sd in sane_default.items():
|
||||
self.assertTrue(sd['to_ids'] in [0, 1])
|
||||
self.assertTrue(sd['default_category'] in categories)
|
||||
|
||||
def test_live_acl(self):
|
||||
query_acl = self.misp.get_live_query_acl()
|
||||
self.assertEqual(query_acl['response'], [])
|
||||
|
||||
def test_recommended_pymisp_version(self):
|
||||
response = self.misp.get_recommended_api_version()
|
||||
recommended_version_tup = tuple(int(x) for x in response['version'].split('.'))
|
||||
pymisp_version_tup = tuple(int(x) for x in __version__.split('.'))[:3]
|
||||
self.assertEqual(recommended_version_tup, pymisp_version_tup)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
|
@ -3,11 +3,6 @@
|
|||
set -e
|
||||
set -x
|
||||
|
||||
if [ ${LEGACY} == true ]; then
|
||||
pip install nose coveralls codecov requests-mock pydeep
|
||||
pip install .[fileobjects]
|
||||
else
|
||||
# We're in python3, installing with pipenv.
|
||||
pip install pipenv
|
||||
pipenv update --dev
|
||||
fi
|
||||
# We're in python3, installing with pipenv.
|
||||
pip install pipenv
|
||||
pipenv update --dev
|
||||
|
|
|
@ -3,9 +3,4 @@
|
|||
set -e
|
||||
set -x
|
||||
|
||||
if [ -z ${LEGACY} ]; then
|
||||
# We're in python3, test all and use pipenv.
|
||||
pipenv run nosetests-3.4 --with-coverage --cover-package=pymisp,tests --cover-tests tests/test_*.py
|
||||
else
|
||||
nosetests --with-coverage --cover-package=pymisp,tests --cover-tests tests/test_mispevent.py
|
||||
fi
|
||||
pipenv run nosetests-3.4 --with-coverage --cover-package=pymisp,tests --cover-tests tests/test_*.py
|
||||
|
|
Loading…
Reference in New Issue