mirror of https://github.com/MISP/PyMISP
commit
15545c7e65
|
@ -26,14 +26,15 @@ Response (if any):
|
|||
|
||||
try:
|
||||
warning_2022()
|
||||
from .exceptions import PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey, InvalidMISPObject, UnknownMISPObjectTemplate, PyMISPInvalidFormat, MISPServerError, PyMISPNotImplementedYet, PyMISPUnexpectedResponse, PyMISPEmptyResponse # noqa
|
||||
from .abstract import AbstractMISP, MISPEncode, pymisp_json_default, MISPTag, Distribution, ThreatLevel, Analysis # noqa
|
||||
from .exceptions import (PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey, # noqa
|
||||
InvalidMISPObject, UnknownMISPObjectTemplate, PyMISPInvalidFormat, MISPServerError, PyMISPNotImplementedYet, PyMISPUnexpectedResponse, PyMISPEmptyResponse)
|
||||
from .abstract import AbstractMISP, MISPEncode, pymisp_json_default, MISPTag, Distribution, ThreatLevel, Analysis # noqa
|
||||
from .mispevent import (MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, # noqa
|
||||
MISPOrganisation, MISPSighting, MISPLog, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy,
|
||||
MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed,
|
||||
MISPEventDelegation, MISPUserSetting, MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist,
|
||||
MISPEventReport, MISPGalaxyCluster, MISPGalaxyClusterElement, MISPGalaxyClusterRelation,
|
||||
MISPCorrelationExclusion, MISPGalaxy, MISPDecayingModel)
|
||||
MISPEventReport, MISPCorrelationExclusion, MISPDecayingModel, MISPGalaxy, MISPGalaxyCluster,
|
||||
MISPGalaxyClusterElement, MISPGalaxyClusterRelation)
|
||||
from .tools import AbstractMISPObjectGenerator # noqa
|
||||
from .tools import Neo4j # noqa
|
||||
from .tools import stix # noqa
|
||||
|
|
|
@ -65,6 +65,10 @@ class UnknownMISPObjectTemplate(MISPObjectException):
|
|||
pass
|
||||
|
||||
|
||||
class InvalidMISPGalaxy(PyMISPError):
|
||||
pass
|
||||
|
||||
|
||||
class PyMISPInvalidFormat(PyMISPError):
|
||||
pass
|
||||
|
||||
|
|
|
@ -17,7 +17,9 @@ from pathlib import Path
|
|||
from typing import List, Optional, Union, IO, Dict, Any
|
||||
|
||||
from .abstract import AbstractMISP, MISPTag
|
||||
from .exceptions import UnknownMISPObjectTemplate, InvalidMISPObject, PyMISPError, NewEventError, NewAttributeError, NewEventReportError, NewGalaxyClusterError, NewGalaxyClusterRelationError
|
||||
from .exceptions import (UnknownMISPObjectTemplate, InvalidMISPGalaxy, InvalidMISPObject,
|
||||
PyMISPError, NewEventError, NewAttributeError, NewEventReportError,
|
||||
NewGalaxyClusterError, NewGalaxyClusterRelationError)
|
||||
|
||||
logger = logging.getLogger('pymisp')
|
||||
|
||||
|
@ -277,6 +279,7 @@ class MISPAttribute(AbstractMISP):
|
|||
self.SharingGroup: MISPSharingGroup
|
||||
self.Sighting: List[MISPSighting] = []
|
||||
self.Tag: List[MISPTag] = []
|
||||
self.Galaxy: List[MISPGalaxy] = []
|
||||
|
||||
# For search
|
||||
self.Event: MISPEvent
|
||||
|
@ -298,6 +301,27 @@ class MISPAttribute(AbstractMISP):
|
|||
"""Set a list of prepared MISPTag."""
|
||||
super()._set_tags(tags)
|
||||
|
||||
def add_galaxy(self, galaxy: Union['MISPGalaxy', dict, None] = None, **kwargs) -> 'MISPGalaxy':
|
||||
"""Add a galaxy to the Attribute, either by passing a MISPGalaxy or a dictionary"""
|
||||
if isinstance(galaxy, MISPGalaxy):
|
||||
self.galaxies.append(galaxy)
|
||||
return galaxy
|
||||
if isinstance(galaxy, dict):
|
||||
misp_galaxy = MISPGalaxy()
|
||||
misp_galaxy.from_dict(**galaxy)
|
||||
elif kwargs:
|
||||
misp_galaxy = MISPGalaxy()
|
||||
misp_galaxy.from_dict(**kwargs)
|
||||
else:
|
||||
raise InvalidMISPGalaxy("A Galaxy to add to an existing Attribute needs to be either a MISPGalaxy or a plain python dictionary")
|
||||
self.galaxies.append(misp_galaxy)
|
||||
return misp_galaxy
|
||||
|
||||
@property
|
||||
def galaxies(self) -> List['MISPGalaxy']:
|
||||
"""Returns a list of galaxies associated to this Attribute"""
|
||||
return self.Galaxy
|
||||
|
||||
def _prepare_data(self, data: Optional[Union[Path, str, bytes, BytesIO]]):
|
||||
if not data:
|
||||
super().__setattr__('data', None)
|
||||
|
@ -588,6 +612,8 @@ class MISPAttribute(AbstractMISP):
|
|||
|
||||
if kwargs.get('Tag'):
|
||||
[self.add_tag(tag) for tag in kwargs.pop('Tag')]
|
||||
if kwargs.get('Galaxy'):
|
||||
[self.add_galaxy(galaxy) for galaxy in kwargs.pop('Galaxy')]
|
||||
if kwargs.get('Sighting'):
|
||||
[self.add_sighting(sighting) for sighting in kwargs.pop('Sighting')]
|
||||
if kwargs.get('ShadowAttribute'):
|
||||
|
@ -1945,13 +1971,23 @@ class MISPEvent(AbstractMISP):
|
|||
self.edited = True
|
||||
return event_report
|
||||
|
||||
def add_galaxy(self, **kwargs) -> MISPGalaxy:
|
||||
"""Add a MISP galaxy and sub-clusters into an event.
|
||||
def add_galaxy(self, galaxy: Union[MISPGalaxy, dict, None] = None, **kwargs) -> MISPGalaxy:
|
||||
"""Add a galaxy and sub-clusters into an event, either by passing
|
||||
a MISPGalaxy or a dictionary.
|
||||
Supports all other parameters supported by MISPGalaxy"""
|
||||
galaxy = MISPGalaxy()
|
||||
galaxy.from_dict(**kwargs)
|
||||
self.galaxies.append(galaxy)
|
||||
return galaxy
|
||||
if isinstance(galaxy, MISPGalaxy):
|
||||
self.galaxies.append(galaxy)
|
||||
return galaxy
|
||||
if isinstance(galaxy, dict):
|
||||
misp_galaxy = MISPGalaxy()
|
||||
misp_galaxy.from_dict(**galaxy)
|
||||
elif kwargs:
|
||||
misp_galaxy = MISPGalaxy()
|
||||
misp_galaxy.from_dict(**kwargs)
|
||||
else:
|
||||
raise InvalidMISPGalaxy("A Galaxy to add to an existing Event needs to be either a MISPGalaxy or a plain python dictionary")
|
||||
self.galaxies.append(misp_galaxy)
|
||||
return misp_galaxy
|
||||
|
||||
def get_object_by_id(self, object_id: Union[str, int]) -> MISPObject:
|
||||
"""Get an object by ID
|
||||
|
|
|
@ -0,0 +1,25 @@
|
|||
{
|
||||
"uuid": "c5f2dfb4-21a1-42d8-a452-1d3c36a204ff",
|
||||
"name": "Tea Matrix",
|
||||
"type": "tea-matrix",
|
||||
"description": "Tea Matrix",
|
||||
"namespace": "tea-matrix",
|
||||
"GalaxyCluster": [
|
||||
{
|
||||
"collection_uuid": "7eacd736-b093-4cc0-a56c-5f84de725dfb",
|
||||
"type": "tea-matrix",
|
||||
"value": "Milk in tea",
|
||||
"tag_name": "misp-galaxy:tea-matrix=\"Milk in tea\"",
|
||||
"description": "Milk in tea",
|
||||
"uuid": "24430dc6-9c27-4b3c-a5e7-6dda478fffa0",
|
||||
"distribution": "3",
|
||||
"default": true,
|
||||
"meta": {
|
||||
"kill_chain": [
|
||||
"tea:black"
|
||||
]
|
||||
},
|
||||
"relationship_type": "ennemy-of"
|
||||
}
|
||||
]
|
||||
}
|
|
@ -8,8 +8,8 @@ import glob
|
|||
import hashlib
|
||||
from datetime import date, datetime
|
||||
|
||||
from pymisp import (MISPEvent, MISPSighting, MISPTag, MISPOrganisation,
|
||||
MISPObject)
|
||||
from pymisp import (MISPAttribute, MISPEvent, MISPGalaxy, MISPObject, MISPOrganisation,
|
||||
MISPSighting, MISPTag)
|
||||
from pymisp.exceptions import InvalidMISPObject
|
||||
from pymisp.tools import GitVulnFinderObject
|
||||
|
||||
|
@ -68,6 +68,15 @@ class TestMISPEvent(unittest.TestCase):
|
|||
del self.mispevent.uuid
|
||||
self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2))
|
||||
|
||||
def test_event_galaxy(self):
|
||||
self.init_event()
|
||||
with open('tests/mispevent_testfiles/galaxy.json', 'r') as f:
|
||||
galaxy = json.load(f)
|
||||
misp_galaxy = MISPGalaxy()
|
||||
misp_galaxy.from_dict(**galaxy)
|
||||
self.mispevent.add_galaxy(misp_galaxy)
|
||||
self.assertEqual(self.mispevent.galaxies[0].to_json(sort_keys=True, indent=2), json.dumps(galaxy, sort_keys=True, indent=2))
|
||||
|
||||
def test_attribute(self):
|
||||
self.init_event()
|
||||
a = self.mispevent.add_attribute('filename', 'bar.exe')
|
||||
|
@ -87,6 +96,21 @@ class TestMISPEvent(unittest.TestCase):
|
|||
ref_json = json.load(f)
|
||||
self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2))
|
||||
|
||||
def test_attribute_galaxy(self):
|
||||
self.init_event()
|
||||
with open('tests/mispevent_testfiles/galaxy.json', 'r') as f:
|
||||
galaxy = json.load(f)
|
||||
misp_galaxy = MISPGalaxy()
|
||||
misp_galaxy.from_dict(**galaxy)
|
||||
attribute = MISPAttribute()
|
||||
attribute.from_dict(**{'type': 'github-username', 'value': 'adulau'})
|
||||
attribute.add_galaxy(misp_galaxy)
|
||||
self.mispevent.add_attribute(**attribute)
|
||||
self.assertEqual(
|
||||
self.mispevent.attributes[0].galaxies[0].to_json(sort_keys=True, indent=2),
|
||||
json.dumps(galaxy, sort_keys=True, indent=2)
|
||||
)
|
||||
|
||||
def test_to_dict_json_format(self):
|
||||
misp_event = MISPEvent()
|
||||
av_signature_object = MISPObject("av-signature")
|
||||
|
@ -130,6 +154,22 @@ class TestMISPEvent(unittest.TestCase):
|
|||
ref_json = json.load(f)
|
||||
self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2))
|
||||
|
||||
def test_object_galaxy(self):
|
||||
self.init_event()
|
||||
misp_object = MISPObject('github-user')
|
||||
misp_object.add_attribute('username', 'adulau')
|
||||
misp_object.add_attribute('repository', 'cve-search')
|
||||
self.mispevent.add_object(misp_object)
|
||||
with open('tests/mispevent_testfiles/galaxy.json', 'r') as f:
|
||||
galaxy = json.load(f)
|
||||
misp_galaxy = MISPGalaxy()
|
||||
misp_galaxy.from_dict(**galaxy)
|
||||
self.mispevent.objects[0].attributes[0].add_galaxy(misp_galaxy)
|
||||
self.assertEqual(
|
||||
self.mispevent.objects[0].attributes[0].galaxies[0].to_json(sort_keys=True, indent=2),
|
||||
json.dumps(galaxy, sort_keys=True, indent=2)
|
||||
)
|
||||
|
||||
def test_malware(self):
|
||||
with open('tests/mispevent_testfiles/simple.json', 'rb') as f:
|
||||
pseudofile = BytesIO(f.read())
|
||||
|
|
Loading…
Reference in New Issue