Merge branch 'main' of github.com:MISP/PyMISP into main

pull/919/head
Alexandre Dulaunoy 2023-02-06 18:18:57 +01:00
commit a272ca56cd
No known key found for this signature in database
GPG Key ID: 09E2CD4944E6CBCD
20 changed files with 5076 additions and 4108 deletions

View File

@ -12,7 +12,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.7, 3.8, 3.9, '3.10']
python-version: [3.8, 3.9, '3.10', '3.11']
steps:

View File

@ -1,7 +1,7 @@
version: 2
python:
version: 3.7
version: 3.8
install:
- method: pip
path: .

View File

@ -2,11 +2,227 @@ Changelog
=========
v2.4.168 (2023-01-23)
---------------------
Changes
~~~~~~~
- Bump version. [Raphaël Vinot]
- Bump deps. [Raphaël Vinot]
- Bump deps. [Raphaël Vinot]
v2.4.167.2 (2023-01-17)
-----------------------
Changes
~~~~~~~
- Bump changelog. [Raphaël Vinot]
- Bump deps, version. [Raphaël Vinot]
Fix
~~~
- Set relationship_type default in MISPTag to empty string. [Raphaël
Vinot]
- Another typo in readme. [Raphaël Vinot]
- Typo in readme. [Raphaël Vinot]
v2.4.167.1 (2023-01-16)
-----------------------
New
~~~
- Add relationship_type in Tag entries for feeds. [Raphaël Vinot]
Changes
~~~~~~~
- Bump changelog. [Raphaël Vinot]
- Bump version. [Raphaël Vinot]
- Bump deps. [Raphaël Vinot]
- Bump requests. [Raphaël Vinot]
- Bump pyzmq. [Raphaël Vinot]
- Bump deps. [Raphaël Vinot]
- Bump python version used by read the docs. [Raphaël Vinot]
- Bump warning to inform user that python 3.10 wil be required in 12
months. [Raphaël Vinot]
- Bump minimal PyMISP version to 3.8. [Raphaël Vinot]
- Re-bump changelog. [Raphaël Vinot]
Fix
~~~
- Update whl files. [Raphaël Vinot]
- Nvm, readthedocs requires python 3.8 at most. [Raphaël Vinot]
v2.4.167 (2022-12-22)
---------------------
Changes
~~~~~~~
- Bump objects. [Raphaël Vinot]
- Bump changelog. [Raphaël Vinot]
- Bump version. [Raphaël Vinot]
- Bump objects. [Raphaël Vinot]
- Bump dependencies, move to poetry 1.3. [Raphaël Vinot]
- Bump certifi. [Raphaël Vinot]
- Bump deps. [Raphaël Vinot]
- Bump deps. [Raphaël Vinot]
- Re-order classes. [Raphaël Vinot]
Other
~~~~~
- Creation fo "add_attributes_from_csv.py" [Julien Mongenet]
The file aims to ingest a formated CSV file containing attributes for MISP ingestion.
- Graceful handling of tagging when name attribute is missing. [Sura De
Silva]
- Add: Galaxy test sample. [Christian Studer]
- Add: Added very straight forward tests to make sure the galaxy
clusters are properly defined. [Christian Studer]
- Add: Added the `Galaxy` field to MISPAttribute using the MISPGalaxy
class. [Christian Studer]
- Including an `add_galaxy` method similar to the
one used for events
- `attribute.galaxies` gives the list of attached
galaxy clusters
v2.4.166 (2022-11-28)
---------------------
New
~~~
- Basic support for listing, enabling and disabling decaying models.
[Raphaël Vinot]
- [tests] Test for local tags. [Raphaël Vinot]
Changes
~~~~~~~
- Re-bump changelog. [Raphaël Vinot]
- Bump changelog. [Raphaël Vinot]
- Bump deps, version. [Raphaël Vinot]
- [types] added azure-application-id. [iglocska]
- Bump deps. [Raphaël Vinot]
- Bump deps. [Raphaël Vinot]
- Bump deps. [Raphaël Vinot]
Fix
~~~
- [describetypes] updated with the latest output from MISP. [iglocska]
- [types] added missing type value. [iglocska]
v2.4.165.1 (2022-11-10)
-----------------------
Changes
~~~~~~~
- Bump changelog. [Raphaël Vinot]
- Bump deps. [Raphaël Vinot]
Fix
~~~
- Properly bump version. [Raphaël Vinot]
Other
~~~~~
- Update __init__.py. [Marcelo Chaves]
Regardless of running the latest PyMISP version, the message below is presented:
```
The version of PyMISP recommended by the MISP instance (2.4.165) is newer than the one you're using now (2.4.162.1). Please upgrade PyMISP.
```
v2.4.165 (2022-11-09)
---------------------
Changes
~~~~~~~
- Bump changelog. [Raphaël Vinot]
- Bump mypy. [Raphaël Vinot]
- Add links to doc. [Raphaël Vinot]
- Bump deps. [Raphaël Vinot]
Fix
~~~
- Issue with EMailObject. [Raphaël Vinot]
v2.4.162.2 (2022-11-02)
-----------------------
New
~~~
- Add in ability to set a taxonomies required status. [Tom King]
Changes
~~~~~~~
- Bump changelog. [Raphaël Vinot]
- Bump lief (CVEs), version. [Raphaël Vinot]
- Bump deps. [Raphaël Vinot]
- [misp-objects] updated to the latest version. [Alexandre Dulaunoy]
- [tests] fix the list name test following latest warning-list updates.
[Alexandre Dulaunoy]
- Bump deps. [Raphaël Vinot]
- Add dependabot. [Raphaël Vinot]
Other
~~~~~
- Revert "chg: [tests] fix the list name test following latest warning-
list" [Alexandre Dulaunoy]
This reverts commit be3715595bcf08d497303198fefdf91c735b3fb2.
- Build(deps): bump actions/setup-python from 2 to 4. [dependabot[bot]]
Bumps [actions/setup-python](https://github.com/actions/setup-python) from 2 to 4.
- [Release notes](https://github.com/actions/setup-python/releases)
- [Commits](https://github.com/actions/setup-python/compare/v2...v4)
---
updated-dependencies:
- dependency-name: actions/setup-python
dependency-type: direct:production
update-type: version-update:semver-major
...
- Build(deps): bump actions/checkout from 2 to 3. [dependabot[bot]]
Bumps [actions/checkout](https://github.com/actions/checkout) from 2 to 3.
- [Release notes](https://github.com/actions/checkout/releases)
- [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md)
- [Commits](https://github.com/actions/checkout/compare/v2...v3)
---
updated-dependencies:
- dependency-name: actions/checkout
dependency-type: direct:production
update-type: version-update:semver-major
...
- Build(deps): bump codecov/codecov-action from 1 to 3.
[dependabot[bot]]
Bumps [codecov/codecov-action](https://github.com/codecov/codecov-action) from 1 to 3.
- [Release notes](https://github.com/codecov/codecov-action/releases)
- [Changelog](https://github.com/codecov/codecov-action/blob/main/CHANGELOG.md)
- [Commits](https://github.com/codecov/codecov-action/compare/v1...v3)
---
updated-dependencies:
- dependency-name: codecov/codecov-action
dependency-type: direct:production
update-type: version-update:semver-major
...
- Create codeql-analysis.yml. [Raphaël Vinot]
v2.4.162.1 (2022-10-02)
-----------------------
Changes
~~~~~~~
- Bump changelog. [Raphaël Vinot]
- Bump deps and version. [Raphaël Vinot]
Fix LIEF vuln.

View File

@ -1,10 +1,10 @@
**IMPORTANT NOTE**: This library will require **at least** python 3.8 starting the 1st of January 2022. If you have legacy versions of python, please use the latest PyMISP version that will be released in December 2021, and consider updating your system(s). Anything released within the last 2 years will do, starting with Ubuntu 20.04.
**IMPORTANT NOTE**: This library will require **at least** Python 3.10 starting the 1st of January 2024. If you have legacy versions of python, please use the latest PyMISP version that will be released in December 2023, and consider updating your system(s). Anything released within the last 2 years will do, starting with Ubuntu 22.04.
# PyMISP - Python Library to access MISP
[![Documentation Status](https://readthedocs.org/projects/pymisp/badge/?version=latest)](http://pymisp.readthedocs.io/?badge=latest)
[![Coverage Status](https://coveralls.io/repos/github/MISP/PyMISP/badge.svg?branch=main)](https://coveralls.io/github/MISP/PyMISP?branch=main)
[![Python 3.6](https://img.shields.io/badge/python-3.6+-blue.svg)](https://www.python.org/downloads/release/python-360/)
[![Python 3.8](https://img.shields.io/badge/python-3.8+-blue.svg)](https://www.python.org/downloads/release/python-380/)
[![PyPi version](https://img.shields.io/pypi/v/pymisp.svg)](https://pypi.python.org/pypi/pymisp/)
[![Number of PyPI downloads](https://img.shields.io/pypi/dm/pymisp.svg)](https://pypi.python.org/pypi/pymisp/)
@ -33,7 +33,7 @@ And there are a few optional dependencies:
* email: to generate MISP Email objects
* brotli: to use the brotli compression when interacting with a MISP instance
Example:
Example:
```
pip3 install pymisp[virustotal,email]

View File

@ -0,0 +1,74 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import csv
from pymisp import PyMISP
from pymisp import ExpandedPyMISP, MISPAttribute
from keys import misp_url, misp_key, misp_verifycert
from requests.packages.urllib3.exceptions import InsecureRequestWarning
import argparse
import urllib3
import requests
requests.packages.urllib3.disable_warnings()
"""
Sample usage:
python3 add_filetype_object_from_csv.py -e <Event_UUID> -f <formated_file_with_attributes>.csv
Attribute CSV file (aach line is an entry):
value;category;type;comment;to_ids;first_seen;last_seen;tag1;tag2
test.pdf;Payload delivery;filename;Email attachment;0;1970-01-01;1970-01-01;tlp:green;ransomware
127.0.0.1;Network activity;ip-dst;C2 server;1;;;tlp:white;
value = IOC's value
category = its MISP category (https://www.circl.lu/doc/misp/categories-and-types/)
type = its MISP type (https://www.circl.lu/doc/misp/categories-and-types/)
comment = IOC's description
to_ids = Boolean expected (0 = IDS flag not checked // 1 = IDS flag checked)
first_seen = First seen date, if any (left empty if not)
last_seen = Last seen date, if any (left empty if not)
tag = IOC tag, if any
"""
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Add attributes to a MISP event from a semi-colon formated csv file')
parser.add_argument("-e", "--event_uuid", required=True, help="Event UUID to update")
parser.add_argument("-f", "--attr_file", required=True, help="Attribute CSV file path")
args = parser.parse_args()
pymisp = ExpandedPyMISP(misp_url, misp_key, misp_verifycert)
f = open(args.attr_file, newline='')
csv_reader = csv.reader(f, delimiter=";")
for line in csv_reader:
value = line[0]
category = line[1]
type = line[2]
comment = line[3]
ids = line[4]
fseen = line[5]
lseen = line[6]
tags = line[7:]
misp_attribute = MISPAttribute()
misp_attribute.value = str(value)
misp_attribute.category = str(category)
misp_attribute.type = str(type)
misp_attribute.comment = str(comment)
misp_attribute.to_ids = str(ids)
if fseen != '':
misp_attribute.first_seen = str(fseen)
if lseen != '':
misp_attribute.last_seen = str(lseen)
for x in tags:
misp_attribute.add_tag(x)
r = pymisp.add_attribute(args.event_uuid, misp_attribute)
print(line)
print("\nAttributes successfully saved :)")

5376
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,4 +1,4 @@
__version__ = '2.4.162.1'
__version__ = '2.4.168'
import logging
import sys
import warnings
@ -6,11 +6,11 @@ import warnings
logger = logging.getLogger(__name__)
def warning_2022():
if sys.version_info < (3, 8):
def warning_2024():
if sys.version_info < (3, 10):
warnings.warn("""
As our baseline system is the latest Ubuntu LTS, and Ubuntu LTS 20.04 has Python 3.8 available,
we will officially deprecate python versions below 3.8 on January 1st 2022.
As our baseline system is the latest Ubuntu LTS, and Ubuntu LTS 22.04 has Python 3.10 available,
we will officially deprecate python versions below 3.10 on January 1st 2024.
**Please update your codebase.**""", DeprecationWarning, stacklevel=3)
@ -25,15 +25,16 @@ Response (if any):
try:
warning_2022()
from .exceptions import PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey, InvalidMISPObject, UnknownMISPObjectTemplate, PyMISPInvalidFormat, MISPServerError, PyMISPNotImplementedYet, PyMISPUnexpectedResponse, PyMISPEmptyResponse # noqa
from .abstract import AbstractMISP, MISPEncode, pymisp_json_default, MISPTag, Distribution, ThreatLevel, Analysis # noqa
warning_2024()
from .exceptions import (PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey, # noqa
InvalidMISPObject, UnknownMISPObjectTemplate, PyMISPInvalidFormat, MISPServerError, PyMISPNotImplementedYet, PyMISPUnexpectedResponse, PyMISPEmptyResponse)
from .abstract import AbstractMISP, MISPEncode, pymisp_json_default, MISPTag, Distribution, ThreatLevel, Analysis # noqa
from .mispevent import (MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, # noqa
MISPOrganisation, MISPSighting, MISPLog, MISPShadowAttribute, MISPWarninglist, MISPTaxonomy,
MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed,
MISPEventDelegation, MISPUserSetting, MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist,
MISPEventReport, MISPGalaxyCluster, MISPGalaxyClusterElement, MISPGalaxyClusterRelation,
MISPCorrelationExclusion, MISPGalaxy)
MISPEventReport, MISPCorrelationExclusion, MISPDecayingModel, MISPGalaxy, MISPGalaxyCluster,
MISPGalaxyClusterElement, MISPGalaxyClusterRelation)
from .tools import AbstractMISPObjectGenerator # noqa
from .tools import Neo4j # noqa
from .tools import stix # noqa

View File

@ -102,7 +102,7 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta):
__misp_objects_path = misp_objects_path
__describe_types = describe_types
def __init__(self, **kwargs):
def __init__(self, **kwargs: Dict):
"""Abstract class for all the MISP objects.
NOTE: Every method in every classes inheriting this one are doing
changes in memory and do not modify data on a remote MISP instance.
@ -120,7 +120,7 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta):
# Ignore the edited objects and keep the timestamps.
self.__force_timestamps: bool = True
else:
self.__force_timestamps: bool = False
self.__force_timestamps = False
@property
def describe_types(self) -> Dict:
@ -367,13 +367,14 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta):
class MISPTag(AbstractMISP):
_fields_for_feed: set = {'name', 'colour'}
_fields_for_feed: set = {'name', 'colour', 'relationship_type'}
def __init__(self, **kwargs):
def __init__(self, **kwargs: Dict):
super().__init__(**kwargs)
self.name: str
self.exportable: bool
self.local: bool
self.relationship_type: Optional[str]
def from_dict(self, **kwargs):
if kwargs.get('Tag'):
@ -381,6 +382,8 @@ class MISPTag(AbstractMISP):
super().from_dict(**kwargs)
def _set_default(self):
if not hasattr(self, 'relationship_type'):
self.relationship_type = ''
if not hasattr(self, 'colour'):
self.colour = '#ffffff'

View File

@ -25,7 +25,7 @@ from .mispevent import MISPEvent, MISPAttribute, MISPSighting, MISPLog, MISPObje
MISPGalaxy, MISPNoticelist, MISPObjectReference, MISPObjectTemplate, MISPSharingGroup, \
MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPCommunity, MISPUserSetting, \
MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, MISPEventReport, \
MISPGalaxyCluster, MISPGalaxyClusterRelation, MISPCorrelationExclusion
MISPGalaxyCluster, MISPGalaxyClusterRelation, MISPCorrelationExclusion, MISPDecayingModel
from .abstract import pymisp_json_default, MISPTag, AbstractMISP, describe_types
@ -90,7 +90,7 @@ def get_uuid_or_id_from_abstract_misp(obj: Union[AbstractMISP, int, str, UUID, d
def register_user(misp_url: str, email: str,
organisation: Union[MISPOrganisation, int, str, UUID] = None,
organisation: Optional[Union[MISPOrganisation, int, str, UUID]] = None,
org_id: Optional[str] = None, org_name: Optional[str] = None,
message: Optional[str] = None, custom_perms: Optional[str] = None,
perm_sync: bool = False, perm_publish: bool = False, perm_admin: bool = False,
@ -155,7 +155,7 @@ class PyMISP:
"""
def __init__(self, url: str, key: str, ssl: bool = True, debug: bool = False, proxies: Optional[MutableMapping[str, str]] = None,
cert: Optional[Union[str, Tuple[str, str]]] = None, auth: AuthBase = None, tool: str = '',
cert: Optional[Union[str, Tuple[str, str]]] = None, auth: Optional[AuthBase] = None, tool: str = '',
timeout: Optional[Union[float, Tuple[float, float]]] = None,
http_headers: Optional[Dict[str, str]]=None
):
@ -332,7 +332,7 @@ class PyMISP:
# ## BEGIN Event ##
def events(self, pythonify: bool = False) -> Union[Dict, List[MISPEvent]]:
"""Get all the events from the MISP instance
"""Get all the events from the MISP instance: https://www.misp-project.org/openapi/#tag/Events/operation/getEvents
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM
"""
@ -353,7 +353,7 @@ class PyMISP:
pythonify: bool = False) -> Union[Dict, MISPEvent]:
"""Get an event from a MISP instance. Includes collections like
Attribute, EventReport, Feed, Galaxy, Object, Tag, etc. so the
response size may be large.
response size may be large : https://www.misp-project.org/openapi/#tag/Events/operation/getEventById
:param event: event to get
:param deleted: whether to include soft-deleted attributes
@ -387,7 +387,7 @@ class PyMISP:
return self._check_head_response(r)
def add_event(self, event: MISPEvent, pythonify: bool = False, metadata: bool = False) -> Union[Dict, MISPEvent]:
"""Add a new event on a MISP instance
"""Add a new event on a MISP instance: https://www.misp-project.org/openapi/#tag/Events/operation/addEvent
:param event: event to add
:param pythonify: Returns a PyMISP Object instead of the plain json output
@ -403,7 +403,7 @@ class PyMISP:
def update_event(self, event: MISPEvent, event_id: Optional[int] = None, pythonify: bool = False,
metadata: bool = False) -> Union[Dict, MISPEvent]:
"""Update an event on a MISP instance'''
"""Update an event on a MISP instance: https://www.misp-project.org/openapi/#tag/Events/operation/editEvent
:param event: event to update
:param event_id: ID of event to update
@ -423,7 +423,7 @@ class PyMISP:
return e
def delete_event(self, event: Union[MISPEvent, int, str, UUID]) -> Dict:
"""Delete an event from a MISP instance'''
"""Delete an event from a MISP instance: https://www.misp-project.org/openapi/#tag/Events/operation/deleteEvent
:param event: event to delete
"""
@ -432,7 +432,7 @@ class PyMISP:
return self._check_json_response(response)
def publish(self, event: Union[MISPEvent, int, str, UUID], alert: bool = False) -> Dict:
"""Publish the event with one single HTTP POST
"""Publish the event with one single HTTP POST: https://www.misp-project.org/openapi/#tag/Events/operation/publishEvent
:param event: event to publish
:param alert: whether to send an email. The default is to not send a mail as it is assumed this method is called on update.
@ -444,6 +444,15 @@ class PyMISP:
response = self._prepare_request('POST', f'events/publish/{event_id}')
return self._check_json_response(response)
def unpublish(self, event: Union[MISPEvent, int, str, UUID]) -> Dict:
"""Unpublish the event with one single HTTP POST: https://www.misp-project.org/openapi/#tag/Events/operation/unpublishEvent
:param event: event to unpublish
"""
event_id = get_uuid_or_id_from_abstract_misp(event)
response = self._prepare_request('POST', f'events/publish/{event_id}')
return self._check_json_response(response)
def contact_event_reporter(self, event: Union[MISPEvent, int, str, UUID], message: str) -> Dict:
"""Send a message to the reporter of an event
@ -547,7 +556,7 @@ class PyMISP:
# ## BEGIN Object ###
def get_object(self, misp_object: Union[MISPObject, int, str, UUID], pythonify: bool = False) -> Union[Dict, MISPObject]:
"""Get an object from the remote MISP instance
"""Get an object from the remote MISP instance: https://www.misp-project.org/openapi/#tag/Objects/operation/getObjectById
:param misp_object: object to get
:param pythonify: Returns a PyMISP Object instead of the plain json output
@ -571,7 +580,7 @@ class PyMISP:
return self._check_head_response(r)
def add_object(self, event: Union[MISPEvent, int, str, UUID], misp_object: MISPObject, pythonify: bool = False, break_on_duplicate: bool = False) -> Union[Dict, MISPObject]:
"""Add a MISP Object to an existing MISP event
"""Add a MISP Object to an existing MISP event: https://www.misp-project.org/openapi/#tag/Objects/operation/addObject
:param event: event to extend
:param misp_object: object to add
@ -608,7 +617,7 @@ class PyMISP:
return o
def delete_object(self, misp_object: Union[MISPObject, int, str, UUID], hard: bool = False) -> Dict:
"""Delete an object from a MISP instance
"""Delete an object from a MISP instance: https://www.misp-project.org/openapi/#tag/Objects/operation/deleteObject
:param misp_object: object to delete
:param hard: flag for hard delete
@ -693,7 +702,7 @@ class PyMISP:
# ## BEGIN Attribute ###
def attributes(self, pythonify: bool = False) -> Union[Dict, List[MISPAttribute]]:
"""Get all the attributes from the MISP instance
"""Get all the attributes from the MISP instance: https://www.misp-project.org/openapi/#tag/Attributes/operation/getAttributes
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM
"""
@ -709,7 +718,7 @@ class PyMISP:
return to_return
def get_attribute(self, attribute: Union[MISPAttribute, int, str, UUID], pythonify: bool = False) -> Union[Dict, MISPAttribute]:
"""Get an attribute from a MISP instance
"""Get an attribute from a MISP instance: https://www.misp-project.org/openapi/#tag/Attributes/operation/getAttributeById
:param attribute: attribute to get
:param pythonify: Returns a PyMISP Object instead of the plain json output
@ -733,7 +742,7 @@ class PyMISP:
return self._check_head_response(r)
def add_attribute(self, event: Union[MISPEvent, int, str, UUID], attribute: Union[MISPAttribute, Iterable], pythonify: bool = False) -> Union[Dict, MISPAttribute, MISPShadowAttribute]:
"""Add an attribute to an existing MISP event
"""Add an attribute to an existing MISP event: https://www.misp-project.org/openapi/#tag/Attributes/operation/addAttribute
:param event: event to extend
:param attribute: attribute or (MISP version 2.4.113+) list of attributes to add.
@ -778,7 +787,7 @@ class PyMISP:
return a
def update_attribute(self, attribute: MISPAttribute, attribute_id: Optional[int] = None, pythonify: bool = False) -> Union[Dict, MISPAttribute, MISPShadowAttribute]:
"""Update an attribute on a MISP instance
"""Update an attribute on a MISP instance: https://www.misp-project.org/openapi/#tag/Attributes/operation/editAttribute
:param attribute: attribute to update
:param attribute_id: attribute ID to update
@ -803,7 +812,7 @@ class PyMISP:
return a
def delete_attribute(self, attribute: Union[MISPAttribute, int, str, UUID], hard: bool = False) -> Dict:
"""Delete an attribute from a MISP instance
"""Delete an attribute from a MISP instance: https://www.misp-project.org/openapi/#tag/Attributes/operation/deleteAttribute
:param attribute: attribute to delete
:param hard: flag for hard delete
@ -822,6 +831,20 @@ class PyMISP:
return self.delete_attribute_proposal(attribute_id)
return response
def restore_attribute(self, attribute: Union[MISPAttribute, int, str, UUID], pythonify: bool = False) -> Union[Dict, MISPAttribute]:
"""Restore a soft deleted attribute from a MISP instance: https://www.misp-project.org/openapi/#tag/Attributes/operation/restoreAttribute
:param attribute: attribute to restore
"""
attribute_id = get_uuid_or_id_from_abstract_misp(attribute)
r = self._prepare_request('POST', f'attributes/restore/{attribute_id}')
response = self._check_json_response(r)
if not (self.global_pythonify or pythonify) or 'errors' in response:
return response
a = MISPAttribute()
a.from_dict(**response)
return a
# ## END Attribute ###
# ## BEGIN Attribute Proposal ###
@ -930,7 +953,7 @@ class PyMISP:
def sightings(self, misp_entity: Optional[AbstractMISP] = None,
org: Optional[Union[MISPOrganisation, int, str, UUID]] = None,
pythonify: bool = False) -> Union[Dict, List[MISPSighting]]:
"""Get the list of sightings related to a MISPEvent or a MISPAttribute (depending on type of misp_entity)
"""Get the list of sightings related to a MISPEvent or a MISPAttribute (depending on type of misp_entity): https://www.misp-project.org/openapi/#tag/Sightings/operation/getSightingsByEventId
:param misp_entity: MISP entity
:param org: MISP organization
@ -964,7 +987,7 @@ class PyMISP:
def add_sighting(self, sighting: MISPSighting,
attribute: Optional[Union[MISPAttribute, int, str, UUID]] = None,
pythonify: bool = False) -> Union[Dict, MISPSighting]:
"""Add a new sighting (globally, or to a specific attribute)
"""Add a new sighting (globally, or to a specific attribute): https://www.misp-project.org/openapi/#tag/Sightings/operation/addSighting and https://www.misp-project.org/openapi/#tag/Sightings/operation/getSightingsByEventId
:param sighting: sighting to add
:param attribute: specific attribute to modify with the sighting
@ -984,7 +1007,7 @@ class PyMISP:
return s
def delete_sighting(self, sighting: Union[MISPSighting, int, str, UUID]) -> Dict:
"""Delete a sighting from a MISP instance
"""Delete a sighting from a MISP instance: https://www.misp-project.org/openapi/#tag/Sightings/operation/deleteSighting
:param sighting: sighting to delete
"""
@ -997,7 +1020,7 @@ class PyMISP:
# ## BEGIN Tags ###
def tags(self, pythonify: bool = False, **kw_params) -> Union[Dict, List[MISPTag]]:
"""Get the list of existing tags.
"""Get the list of existing tags: https://www.misp-project.org/openapi/#tag/Tags/operation/getTags
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM
"""
@ -1013,7 +1036,7 @@ class PyMISP:
return to_return
def get_tag(self, tag: Union[MISPTag, int, str, UUID], pythonify: bool = False) -> Union[Dict, MISPTag]:
"""Get a tag by id.
"""Get a tag by id: https://www.misp-project.org/openapi/#tag/Tags/operation/getTagById
:param tag: tag to get
:param pythonify: Returns a PyMISP Object instead of the plain json output
@ -1028,7 +1051,7 @@ class PyMISP:
return t
def add_tag(self, tag: MISPTag, pythonify: bool = False) -> Union[Dict, MISPTag]:
"""Add a new tag on a MISP instance.
"""Add a new tag on a MISP instance: https://www.misp-project.org/openapi/#tag/Tags/operation/addTag
The user calling this method needs the Tag Editor permission.
It doesn't add a tag to an event, simply creates it on the MISP instance.
@ -1062,7 +1085,7 @@ class PyMISP:
return self.update_tag(tag, pythonify=pythonify)
def update_tag(self, tag: MISPTag, tag_id: Optional[int] = None, pythonify: bool = False) -> Union[Dict, MISPTag]:
"""Edit only the provided parameters of a tag
"""Edit only the provided parameters of a tag: https://www.misp-project.org/openapi/#tag/Tags/operation/editTag
:param tag: tag to update
:aram tag_id: tag ID to update
@ -1081,7 +1104,7 @@ class PyMISP:
return t
def delete_tag(self, tag: Union[MISPTag, int, str, UUID]) -> Dict:
"""Delete a tag from a MISP instance
"""Delete a tag from a MISP instance: https://www.misp-project.org/openapi/#tag/Tags/operation/deleteTag
:param tag: tag to delete
"""
@ -1090,7 +1113,7 @@ class PyMISP:
return self._check_json_response(response)
def search_tags(self, tagname: str, strict_tagname: bool = False, pythonify: bool = False) -> Union[Dict, List[MISPTag]]:
"""Search for tags by name.
"""Search for tags by name: https://www.misp-project.org/openapi/#tag/Tags/operation/searchTag
:param tag_name: Name to search, use % for substrings matches.
:param strict_tagname: only return tags matching exactly the tag name (so skipping synonyms and cluster's value)
@ -1112,7 +1135,7 @@ class PyMISP:
# ## BEGIN Taxonomies ###
def taxonomies(self, pythonify: bool = False) -> Union[Dict, List[MISPTaxonomy]]:
"""Get all the taxonomies
"""Get all the taxonomies: https://www.misp-project.org/openapi/#tag/Taxonomies/operation/getTaxonomies
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM
"""
@ -1128,7 +1151,7 @@ class PyMISP:
return to_return
def get_taxonomy(self, taxonomy: Union[MISPTaxonomy, int, str, UUID], pythonify: bool = False) -> Union[Dict, MISPTaxonomy]:
"""Get a taxonomy by id or namespace from a MISP instance
"""Get a taxonomy by id or namespace from a MISP instance: https://www.misp-project.org/openapi/#tag/Taxonomies/operation/getTaxonomyById
:param taxonomy: taxonomy to get
:param pythonify: Returns a PyMISP Object instead of the plain json output
@ -1143,7 +1166,7 @@ class PyMISP:
return t
def enable_taxonomy(self, taxonomy: Union[MISPTaxonomy, int, str, UUID]) -> Dict:
"""Enable a taxonomy
"""Enable a taxonomy: https://www.misp-project.org/openapi/#tag/Taxonomies/operation/enableTaxonomy
:param taxonomy: taxonomy to enable
"""
@ -1152,7 +1175,7 @@ class PyMISP:
return self._check_json_response(response)
def disable_taxonomy(self, taxonomy: Union[MISPTaxonomy, int, str, UUID]) -> Dict:
"""Disable a taxonomy.
"""Disable a taxonomy: https://www.misp-project.org/openapi/#tag/Taxonomies/operation/disableTaxonomy
:param taxonomy: taxonomy to disable
"""
@ -1187,7 +1210,7 @@ class PyMISP:
return self._check_json_response(response)
def update_taxonomies(self) -> Dict:
"""Update all the taxonomies."""
"""Update all the taxonomies: https://www.misp-project.org/openapi/#tag/Taxonomies/operation/updateTaxonomies"""
response = self._prepare_request('POST', 'taxonomies/update')
return self._check_json_response(response)
@ -1207,7 +1230,7 @@ class PyMISP:
# ## BEGIN Warninglists ###
def warninglists(self, pythonify: bool = False) -> Union[Dict, List[MISPWarninglist]]:
"""Get all the warninglists.
"""Get all the warninglists: https://www.misp-project.org/openapi/#tag/Warninglists/operation/getWarninglists
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM
"""
@ -1223,7 +1246,7 @@ class PyMISP:
return to_return
def get_warninglist(self, warninglist: Union[MISPWarninglist, int, str, UUID], pythonify: bool = False) -> Union[Dict, MISPWarninglist]:
"""Get a warninglist by id
"""Get a warninglist by id: https://www.misp-project.org/openapi/#tag/Warninglists/operation/getWarninglistById
:param warninglist: warninglist to get
:param pythonify: Returns a PyMISP Object instead of the plain json output
@ -1238,7 +1261,7 @@ class PyMISP:
return w
def toggle_warninglist(self, warninglist_id: Optional[Union[str, int, List[int]]] = None, warninglist_name: Optional[Union[str, List[str]]] = None, force_enable: bool = False) -> Dict:
'''Toggle (enable/disable) the status of a warninglist by id
'''Toggle (enable/disable) the status of a warninglist by id: https://www.misp-project.org/openapi/#tag/Warninglists/operation/toggleEnableWarninglist
:param warninglist_id: ID of the WarningList
:param warninglist_name: name of the WarningList
@ -1287,7 +1310,7 @@ class PyMISP:
return self._check_json_response(response)
def update_warninglists(self) -> Dict:
"""Update all the warninglists."""
"""Update all the warninglists: https://www.misp-project.org/openapi/#tag/Warninglists/operation/updateWarninglists"""
response = self._prepare_request('POST', 'warninglists/update')
return self._check_json_response(response)
@ -1296,7 +1319,7 @@ class PyMISP:
# ## BEGIN Noticelist ###
def noticelists(self, pythonify: bool = False) -> Union[Dict, List[MISPNoticelist]]:
"""Get all the noticelists
"""Get all the noticelists: https://www.misp-project.org/openapi/#tag/Noticelists/operation/getNoticelists
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM
"""
@ -1312,7 +1335,7 @@ class PyMISP:
return to_return
def get_noticelist(self, noticelist: Union[MISPNoticelist, int, str, UUID], pythonify: bool = False) -> Union[Dict, MISPNoticelist]:
"""Get a noticelist by id
"""Get a noticelist by id: https://www.misp-project.org/openapi/#tag/Noticelists/operation/getNoticelistById
:param notistlist: Noticelist to get
:param pythonify: Returns a PyMISP Object instead of the plain json output
@ -1327,7 +1350,7 @@ class PyMISP:
return n
def enable_noticelist(self, noticelist: Union[MISPNoticelist, int, str, UUID]) -> Dict:
"""Enable a noticelist by id
"""Enable a noticelist by id: https://www.misp-project.org/openapi/#tag/Noticelists/operation/toggleEnableNoticelist
:param noticelist: Noticelist to enable
"""
@ -1349,7 +1372,7 @@ class PyMISP:
return self._check_json_response(response)
def update_noticelists(self) -> Dict:
"""Update all the noticelists."""
"""Update all the noticelists: https://www.misp-project.org/openapi/#tag/Noticelists/operation/updateNoticelists"""
response = self._prepare_request('POST', 'noticelists/update')
return self._check_json_response(response)
@ -1421,7 +1444,7 @@ class PyMISP:
# ## BEGIN Galaxy ###
def galaxies(self, pythonify: bool = False) -> Union[Dict, List[MISPGalaxy]]:
"""Get all the galaxies
"""Get all the galaxies: https://www.misp-project.org/openapi/#tag/Galaxies/operation/getGalaxies
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM
"""
@ -1437,7 +1460,7 @@ class PyMISP:
return to_return
def get_galaxy(self, galaxy: Union[MISPGalaxy, int, str, UUID], withCluster: bool = False, pythonify: bool = False) -> Union[Dict, MISPGalaxy]:
"""Get a galaxy by id
"""Get a galaxy by id: https://www.misp-project.org/openapi/#tag/Galaxies/operation/getGalaxyById
:param galaxy: galaxy to get
:param withCluster: Include the clusters associated with the galaxy
@ -1452,8 +1475,8 @@ class PyMISP:
g.from_dict(**galaxy_j, withCluster=withCluster)
return g
def search_galaxy_clusters(self, galaxy: Union[MISPGalaxy, int, str, UUID], context: str = "all", searchall: str = None, pythonify: bool = False) -> Union[Dict, List[MISPGalaxyCluster]]:
"""Searches the galaxy clusters within a specific galaxy
def search_galaxy_clusters(self, galaxy: Union[MISPGalaxy, int, str, UUID], context: str = "all", searchall: Optional[str] = None, pythonify: bool = False) -> Union[Dict, List[MISPGalaxyCluster]]:
"""Searches the galaxy clusters within a specific galaxy: https://www.misp-project.org/openapi/#tag/Galaxy-Clusters/operation/getGalaxyClusters and https://www.misp-project.org/openapi/#tag/Galaxy-Clusters/operation/getGalaxyClusterById
:param galaxy: The MISPGalaxy you wish to search in
:param context: The context of how you want to search within the galaxy_
@ -1480,7 +1503,7 @@ class PyMISP:
return response
def update_galaxies(self) -> Dict:
"""Update all the galaxies."""
"""Update all the galaxies: https://www.misp-project.org/openapi/#tag/Galaxies/operation/updateGalaxies"""
response = self._prepare_request('POST', 'galaxies/update')
return self._check_json_response(response)
@ -1501,7 +1524,7 @@ class PyMISP:
return gc
def add_galaxy_cluster(self, galaxy: Union[MISPGalaxy, str, UUID], galaxy_cluster: MISPGalaxyCluster, pythonify: bool = False) -> Union[Dict, MISPGalaxyCluster]:
"""Add a new galaxy cluster to a MISP Galaxy
"""Add a new galaxy cluster to a MISP Galaxy: https://www.misp-project.org/openapi/#tag/Galaxy-Clusters/operation/addGalaxyCluster
:param galaxy: A MISPGalaxy (or UUID) where you wish to add the galaxy cluster
:param galaxy_cluster: A MISPGalaxyCluster you wish to add
@ -1521,7 +1544,7 @@ class PyMISP:
return gc
def update_galaxy_cluster(self, galaxy_cluster: MISPGalaxyCluster, pythonify: bool = False) -> Union[Dict, MISPGalaxyCluster]:
"""Update a custom galaxy cluster.
"""Update a custom galaxy cluster: https://www.misp-project.org/openapi/#tag/Galaxy-Clusters/operation/editGalaxyCluster
;param galaxy_cluster: The MISPGalaxyCluster you wish to update
:param pythonify: Returns a PyMISP Object instead of the plain json output
@ -1540,7 +1563,7 @@ class PyMISP:
return gc
def publish_galaxy_cluster(self, galaxy_cluster: Union[MISPGalaxyCluster, int, str, UUID]) -> Dict:
"""Publishes a galaxy cluster
"""Publishes a galaxy cluster: https://www.misp-project.org/openapi/#tag/Galaxy-Clusters/operation/publishGalaxyCluster
:param galaxy_cluster: The galaxy cluster you wish to publish
"""
@ -1576,7 +1599,7 @@ class PyMISP:
return gc
def delete_galaxy_cluster(self, galaxy_cluster: Union[MISPGalaxyCluster, int, str, UUID], hard=False) -> Dict:
"""Deletes a galaxy cluster from MISP
"""Deletes a galaxy cluster from MISP: https://www.misp-project.org/openapi/#tag/Galaxy-Clusters/operation/deleteGalaxyCluster
:param galaxy_cluster: The MISPGalaxyCluster you wish to delete from MISP
:param hard: flag for hard delete
@ -1626,7 +1649,7 @@ class PyMISP:
# ## BEGIN Feed ###
def feeds(self, pythonify: bool = False) -> Union[Dict, List[MISPFeed]]:
"""Get the list of existing feeds
"""Get the list of existing feeds: https://www.misp-project.org/openapi/#tag/Feeds/operation/getFeeds
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM
"""
@ -1642,7 +1665,7 @@ class PyMISP:
return to_return
def get_feed(self, feed: Union[MISPFeed, int, str, UUID], pythonify: bool = False) -> Union[Dict, MISPFeed]:
"""Get a feed by id
"""Get a feed by id: https://www.misp-project.org/openapi/#tag/Feeds/operation/getFeedById
:param feed: feed to get
:param pythonify: Returns a PyMISP Object instead of the plain json output
@ -1657,7 +1680,7 @@ class PyMISP:
return f
def add_feed(self, feed: MISPFeed, pythonify: bool = False) -> Union[Dict, MISPFeed]:
"""Add a new feed on a MISP instance
"""Add a new feed on a MISP instance: https://www.misp-project.org/openapi/#tag/Feeds/operation/addFeed
:param feed: feed to add
:param pythonify: Returns a PyMISP Object instead of the plain json output
@ -1672,7 +1695,7 @@ class PyMISP:
return f
def enable_feed(self, feed: Union[MISPFeed, int, str, UUID], pythonify: bool = False) -> Union[Dict, MISPFeed]:
"""Enable a feed; fetching it will create event(s)
"""Enable a feed; fetching it will create event(s): https://www.misp-project.org/openapi/#tag/Feeds/operation/enableFeed
:param feed: feed to enable
:param pythonify: Returns a PyMISP Object instead of the plain json output
@ -1687,7 +1710,7 @@ class PyMISP:
return self.update_feed(feed=f, pythonify=pythonify)
def disable_feed(self, feed: Union[MISPFeed, int, str, UUID], pythonify: bool = False) -> Union[Dict, MISPFeed]:
"""Disable a feed
"""Disable a feed: https://www.misp-project.org/openapi/#tag/Feeds/operation/disableFeed
:param feed: feed to disable
:param pythonify: Returns a PyMISP Object instead of the plain json output
@ -1761,7 +1784,7 @@ class PyMISP:
return self._check_json_response(response)
def fetch_feed(self, feed: Union[MISPFeed, int, str, UUID]) -> Dict:
"""Fetch one single feed by id
"""Fetch one single feed by id: https://www.misp-project.org/openapi/#tag/Feeds/operation/fetchFromFeed
:param feed: feed to fetch
"""
@ -1770,12 +1793,12 @@ class PyMISP:
return self._check_json_response(response)
def cache_all_feeds(self) -> Dict:
""" Cache all the feeds"""
""" Cache all the feeds: https://www.misp-project.org/openapi/#tag/Feeds/operation/cacheFeeds"""
response = self._prepare_request('GET', 'feeds/cacheFeeds/all')
return self._check_json_response(response)
def cache_feed(self, feed: Union[MISPFeed, int, str, UUID]) -> Dict:
"""Cache a specific feed by id
"""Cache a specific feed by id: https://www.misp-project.org/openapi/#tag/Feeds/operation/cacheFeeds
:param feed: feed to cache
"""
@ -1808,7 +1831,7 @@ class PyMISP:
# ## BEGIN Server ###
def servers(self, pythonify: bool = False) -> Union[Dict, List[MISPServer]]:
"""Get the existing servers the MISP instance can synchronise with
"""Get the existing servers the MISP instance can synchronise with: https://www.misp-project.org/openapi/#tag/Servers/operation/getServers
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM
"""
@ -1852,7 +1875,7 @@ class PyMISP:
return s
def add_server(self, server: MISPServer, pythonify: bool = False) -> Union[Dict, MISPServer]:
"""Add a server to synchronise with.
"""Add a server to synchronise with: https://www.misp-project.org/openapi/#tag/Servers/operation/getServers
Note: You probably want to use PyMISP.get_sync_config and PyMISP.import_server instead
:param server: sync server config
@ -1867,7 +1890,7 @@ class PyMISP:
return s
def update_server(self, server: MISPServer, server_id: Optional[int] = None, pythonify: bool = False) -> Union[Dict, MISPServer]:
"""Update a server to synchronise with
"""Update a server to synchronise with: https://www.misp-project.org/openapi/#tag/Servers/operation/getServers
:param server: sync server config
:param pythonify: Returns a PyMISP Object instead of the plain json output
@ -1885,7 +1908,7 @@ class PyMISP:
return s
def delete_server(self, server: Union[MISPServer, int, str, UUID]) -> Dict:
"""Delete a sync server
"""Delete a sync server: https://www.misp-project.org/openapi/#tag/Servers/operation/getServers
:param server: sync server config
"""
@ -1894,7 +1917,7 @@ class PyMISP:
return self._check_json_response(response)
def server_pull(self, server: Union[MISPServer, int, str, UUID], event: Optional[Union[MISPEvent, int, str, UUID]] = None) -> Dict:
"""Initialize a pull from a sync server, optionally limited to one event
"""Initialize a pull from a sync server, optionally limited to one event: https://www.misp-project.org/openapi/#tag/Servers/operation/pullServer
:param server: sync server config
:param event: event
@ -1910,7 +1933,7 @@ class PyMISP:
return self._check_json_response(response)
def server_push(self, server: Union[MISPServer, int, str, UUID], event: Optional[Union[MISPEvent, int, str, UUID]] = None) -> Dict:
"""Initialize a push to a sync server, optionally limited to one event
"""Initialize a push to a sync server, optionally limited to one event: https://www.misp-project.org/openapi/#tag/Servers/operation/pushServer
:param server: sync server config
:param event: event
@ -1939,7 +1962,7 @@ class PyMISP:
# ## BEGIN Sharing group ###
def sharing_groups(self, pythonify: bool = False) -> Union[Dict, List[MISPSharingGroup]]:
"""Get the existing sharing groups
"""Get the existing sharing groups: https://www.misp-project.org/openapi/#tag/Sharing-Groups/operation/getSharingGroup
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM
"""
@ -1955,7 +1978,7 @@ class PyMISP:
return to_return
def get_sharing_group(self, sharing_group: Union[MISPSharingGroup, int, str, UUID], pythonify: bool = False) -> Union[Dict, MISPSharingGroup]:
"""Get a sharing group
"""Get a sharing group: https://www.misp-project.org/openapi/#tag/Sharing-Groups/operation/getSharingGroupById
:param sharing_group: sharing group to find
:param pythonify: Returns a PyMISP Object instead of the plain json output
@ -1970,7 +1993,7 @@ class PyMISP:
return s
def add_sharing_group(self, sharing_group: MISPSharingGroup, pythonify: bool = False) -> Union[Dict, MISPSharingGroup]:
"""Add a new sharing group
"""Add a new sharing group: https://www.misp-project.org/openapi/#tag/Sharing-Groups/operation/addSharingGroup
:param sharing_group: sharing group to add
:param pythonify: Returns a PyMISP Object instead of the plain json output
@ -1984,7 +2007,7 @@ class PyMISP:
return s
def update_sharing_group(self, sharing_group: Union[MISPSharingGroup, dict], sharing_group_id: Optional[int] = None, pythonify: bool = False) -> Union[Dict, MISPSharingGroup]:
"""Update sharing group parameters
"""Update sharing group parameters: https://www.misp-project.org/openapi/#tag/Sharing-Groups/operation/editSharingGroup
:param sharing_group: MISP Sharing Group
:param sharing_group_id Sharing group ID
@ -2012,7 +2035,7 @@ class PyMISP:
return self._check_head_response(r)
def delete_sharing_group(self, sharing_group: Union[MISPSharingGroup, int, str, UUID]) -> Dict:
"""Delete a sharing group
"""Delete a sharing group: https://www.misp-project.org/openapi/#tag/Sharing-Groups/operation/deleteSharingGroup
:param sharing_group: sharing group to delete
"""
@ -2022,7 +2045,7 @@ class PyMISP:
def add_org_to_sharing_group(self, sharing_group: Union[MISPSharingGroup, int, str, UUID],
organisation: Union[MISPOrganisation, int, str, UUID], extend: bool = False) -> Dict:
'''Add an organisation to a sharing group.
'''Add an organisation to a sharing group: https://www.misp-project.org/openapi/#tag/Sharing-Groups/operation/addOrganisationToSharingGroup
:param sharing_group: Sharing group's local instance ID, or Sharing group's global UUID
:param organisation: Organisation's local instance ID, or Organisation's global UUID, or Organisation's name as known to the curent instance
@ -2036,7 +2059,7 @@ class PyMISP:
def remove_org_from_sharing_group(self, sharing_group: Union[MISPSharingGroup, int, str, UUID],
organisation: Union[MISPOrganisation, int, str, UUID]) -> Dict:
'''Remove an organisation from a sharing group.
'''Remove an organisation from a sharing group: https://www.misp-project.org/openapi/#tag/Sharing-Groups/operation/removeOrganisationFromSharingGroup
:param sharing_group: Sharing group's local instance ID, or Sharing group's global UUID
:param organisation: Organisation's local instance ID, or Organisation's global UUID, or Organisation's name as known to the curent instance
@ -2049,7 +2072,7 @@ class PyMISP:
def add_server_to_sharing_group(self, sharing_group: Union[MISPSharingGroup, int, str, UUID],
server: Union[MISPServer, int, str, UUID], all_orgs: bool = False) -> Dict:
'''Add a server to a sharing group.
'''Add a server to a sharing group: https://www.misp-project.org/openapi/#tag/Sharing-Groups/operation/addServerToSharingGroup
:param sharing_group: Sharing group's local instance ID, or Sharing group's global UUID
:param server: Server's local instance ID, or URL of the Server, or Server's name as known to the curent instance
@ -2063,7 +2086,7 @@ class PyMISP:
def remove_server_from_sharing_group(self, sharing_group: Union[MISPSharingGroup, int, str, UUID],
server: Union[MISPServer, int, str, UUID]) -> Dict:
'''Remove a server from a sharing group.
'''Remove a server from a sharing group: https://www.misp-project.org/openapi/#tag/Sharing-Groups/operation/removeServerFromSharingGroup
:param sharing_group: Sharing group's local instance ID, or Sharing group's global UUID
:param server: Server's local instance ID, or URL of the Server, or Server's name as known to the curent instance
@ -2078,8 +2101,8 @@ class PyMISP:
# ## BEGIN Organisation ###
def organisations(self, scope="local", search: str = None, pythonify: bool = False) -> Union[Dict, List[MISPOrganisation]]:
"""Get all the organisations
def organisations(self, scope="local", search: Optional[str] = None, pythonify: bool = False) -> Union[Dict, List[MISPOrganisation]]:
"""Get all the organisations: https://www.misp-project.org/openapi/#tag/Organisations/operation/getOrganisations
:param scope: scope of organizations to get
:param search: The search to make against the list of organisations
@ -2101,7 +2124,7 @@ class PyMISP:
return to_return
def get_organisation(self, organisation: Union[MISPOrganisation, int, str, UUID], pythonify: bool = False) -> Union[Dict, MISPOrganisation]:
"""Get an organisation by id
"""Get an organisation by id: https://www.misp-project.org/openapi/#tag/Organisations/operation/getOrganisationById
:param organisation: organization to get
:param pythonify: Returns a PyMISP Object instead of the plain json output
@ -2125,7 +2148,7 @@ class PyMISP:
return self._check_head_response(r)
def add_organisation(self, organisation: MISPOrganisation, pythonify: bool = False) -> Union[Dict, MISPOrganisation]:
"""Add an organisation
"""Add an organisation: https://www.misp-project.org/openapi/#tag/Organisations/operation/addOrganisation
:param organisation: organization to add
:param pythonify: Returns a PyMISP Object instead of the plain json output
@ -2139,7 +2162,7 @@ class PyMISP:
return o
def update_organisation(self, organisation: MISPOrganisation, organisation_id: Optional[int] = None, pythonify: bool = False) -> Union[Dict, MISPOrganisation]:
"""Update an organisation
"""Update an organisation: https://www.misp-project.org/openapi/#tag/Organisations/operation/editOrganisation
:param organisation: organization to update
:param organisation_id: id to update
@ -2158,7 +2181,7 @@ class PyMISP:
return o
def delete_organisation(self, organisation: Union[MISPOrganisation, int, str, UUID]) -> Dict:
"""Delete an organisation by id
"""Delete an organisation by id: https://www.misp-project.org/openapi/#tag/Organisations/operation/deleteOrganisation
:param organisation: organization to delete
"""
@ -2171,8 +2194,8 @@ class PyMISP:
# ## BEGIN User ###
def users(self, search: str = None, organisation: int = None, pythonify: bool = False) -> Union[Dict, List[MISPUser]]:
"""Get all the users, or a filtered set of users
def users(self, search: Optional[str] = None, organisation: Optional[int] = None, pythonify: bool = False) -> Union[Dict, List[MISPUser]]:
"""Get all the users, or a filtered set of users: https://www.misp-project.org/openapi/#tag/Users/operation/getUsers
:param search: The search to make against the list of users
:param organisation: The ID of an organisation to filter against
@ -2196,7 +2219,7 @@ class PyMISP:
return to_return
def get_user(self, user: Union[MISPUser, int, str, UUID] = 'me', pythonify: bool = False, expanded: bool = False) -> Union[Dict, MISPUser, Tuple[MISPUser, MISPRole, List[MISPUserSetting]]]:
"""Get a user by id
"""Get a user by id: https://www.misp-project.org/openapi/#tag/Users/operation/getUsers
:param user: user to get; `me` means the owner of the API key doing the query
:param pythonify: Returns a PyMISP Object instead of the plain json output
@ -2223,7 +2246,7 @@ class PyMISP:
return u, role, usersettings
def get_new_authkey(self, user: Union[MISPUser, int, str, UUID] = 'me') -> str:
'''Get a new authorization key for a specific user, defaults to user doing the call.
'''Get a new authorization key for a specific user, defaults to user doing the call: https://www.misp-project.org/openapi/#tag/AuthKeys/operation/addAuthKey
:param user: The owner of the key
'''
@ -2236,7 +2259,7 @@ class PyMISP:
raise PyMISPUnexpectedResponse(f'Unable to get authkey: {authkey}')
def add_user(self, user: MISPUser, pythonify: bool = False) -> Union[Dict, MISPUser]:
"""Add a new user
"""Add a new user: https://www.misp-project.org/openapi/#tag/Users/operation/addUser
:param user: user to add
:param pythonify: Returns a PyMISP Object instead of the plain json output
@ -2250,7 +2273,7 @@ class PyMISP:
return u
def update_user(self, user: MISPUser, user_id: Optional[int] = None, pythonify: bool = False) -> Union[Dict, MISPUser]:
"""Update a user on a MISP instance
"""Update a user on a MISP instance: https://www.misp-project.org/openapi/#tag/Users/operation/editUser
:param user: user to update
:param user_id: id to update
@ -2272,7 +2295,7 @@ class PyMISP:
return e
def delete_user(self, user: Union[MISPUser, int, str, UUID]) -> Dict:
"""Delete a user by id
"""Delete a user by id: https://www.misp-project.org/openapi/#tag/Users/operation/deleteUser
:param user: user to delete
"""
@ -2282,7 +2305,7 @@ class PyMISP:
return self._check_json_response(response)
def change_user_password(self, new_password: str) -> Dict:
"""Change the password of the curent user
"""Change the password of the curent user:
:param new_password: password to set
"""
@ -2397,6 +2420,49 @@ class PyMISP:
# ## END Role ###
# ## BEGIN Decaying Models ###
def update_decaying_models(self) -> Dict:
"""Update all the Decaying models"""
response = self._prepare_request('POST', 'decayingModel/update')
return self._check_json_response(response)
def decaying_models(self, pythonify: bool = False) -> Union[Dict, List[MISPDecayingModel]]:
"""Get all the decaying models
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output
"""
r = self._prepare_request('GET', 'decayingModel/index')
models = self._check_json_response(r)
if not (self.global_pythonify or pythonify) or 'errors' in models:
return models
to_return = []
for model in models:
n = MISPDecayingModel()
n.from_dict(**model)
to_return.append(n)
return to_return
def enable_decaying_model(self, decaying_model: Union[MISPDecayingModel, int, str]) -> Dict:
"""Enable a decaying Model"""
if isinstance(decaying_model, MISPDecayingModel):
decaying_model_id = decaying_model.id
else:
decaying_model_id = int(decaying_model)
response = self._prepare_request('POST', f'decayingModel/enable/{decaying_model_id}')
return self._check_json_response(response)
def disable_decaying_model(self, decaying_model: Union[MISPDecayingModel, int, str]) -> Dict:
"""Disable a decaying Model"""
if isinstance(decaying_model, MISPDecayingModel):
decaying_model_id = decaying_model.id
else:
decaying_model_id = int(decaying_model)
response = self._prepare_request('POST', f'decayingModel/disable/{decaying_model_id}')
return self._check_json_response(response)
# ## END Decaying Models ###
# ## BEGIN Search methods ###
def search(self, controller: str = 'events', return_format: str = 'json',
@ -2449,6 +2515,12 @@ class PyMISP:
'''Search in the MISP instance
:param controller: Controller to search on, it can be `events`, `objects`, `attributes`. The response will either be a list of events, objects, or attributes.
Reference documentation for each controller:
* events: https://www.misp-project.org/openapi/#tag/Events/operation/restSearchEvents
* attributes: https://www.misp-project.org/openapi/#tag/Attributes/operation/restSearchAttributes
* objects: N/A
:param return_format: Set the return format of the search (Currently supported: json, xml, openioc, suricata, snort - more formats are being moved to restSearch with the goal being that all searches happen through this API). Can be passed as the first parameter after restSearch or via the JSON payload.
:param limit: Limit the number of results returned, depending on the scope (for example 10 attributes or 10 full events).
:param page: If a limit is set, sets the page to be returned. page 3, limit 100 will return records 201->300).
@ -3179,7 +3251,7 @@ class PyMISP:
# ## BEGIN User Settings ###
def user_settings(self, pythonify: bool = False) -> Union[Dict, List[MISPUserSetting]]:
"""Get all the user settings
"""Get all the user settings: https://www.misp-project.org/openapi/#tag/UserSettings/operation/getUserSettings
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM
"""
@ -3196,7 +3268,7 @@ class PyMISP:
def get_user_setting(self, user_setting: str, user: Optional[Union[MISPUser, int, str, UUID]] = None,
pythonify: bool = False) -> Union[Dict, MISPUserSetting]:
"""Get a user setting
"""Get a user setting: https://www.misp-project.org/openapi/#tag/UserSettings/operation/getUserSettingById
:param user_setting: name of user setting
:param user: user
@ -3215,7 +3287,7 @@ class PyMISP:
def set_user_setting(self, user_setting: str, value: Union[str, dict], user: Optional[Union[MISPUser, int, str, UUID]] = None,
pythonify: bool = False) -> Union[Dict, MISPUserSetting]:
"""Set a user setting
"""Set a user setting: https://www.misp-project.org/openapi/#tag/UserSettings/operation/setUserSetting
:param user_setting: name of user setting
:param value: value to set
@ -3237,7 +3309,7 @@ class PyMISP:
return u
def delete_user_setting(self, user_setting: str, user: Optional[Union[MISPUser, int, str, UUID]] = None) -> Dict:
"""Delete a user setting
"""Delete a user setting: https://www.misp-project.org/openapi/#tag/UserSettings/operation/deleteUserSettingById
:param user_setting: name of user setting
:param user: user
@ -3420,8 +3492,10 @@ class PyMISP:
"""
uuid = get_uuid_or_id_from_abstract_misp(misp_entity)
if isinstance(tag, MISPTag):
tag = tag.name
to_post = {'uuid': uuid, 'tag': tag, 'local': local}
tag_name = tag.name if 'name' in tag else ""
else:
tag_name = tag
to_post = {'uuid': uuid, 'tag': tag_name, 'local': local}
response = self._prepare_request('POST', 'tags/attachTagToObject', data=to_post)
return self._check_json_response(response)
@ -3433,8 +3507,7 @@ class PyMISP:
"""
uuid = get_uuid_or_id_from_abstract_misp(misp_entity)
if isinstance(tag, MISPTag):
if 'name' in tag:
tag_name = tag.name
tag_name = tag.name if 'name' in tag else ""
else:
tag_name = tag
to_post = {'uuid': uuid, 'tag': tag_name}

File diff suppressed because it is too large Load Diff

@ -1 +1 @@
Subproject commit 34ed3309e0392a1957d8dd493c5b4e3c32f9e503
Subproject commit fd603be3283953b68ed48ede7afd2e19f43577ac

View File

@ -65,6 +65,10 @@ class UnknownMISPObjectTemplate(MISPObjectException):
pass
class InvalidMISPGalaxy(PyMISPError):
pass
class PyMISPInvalidFormat(PyMISPError):
pass

View File

@ -17,7 +17,9 @@ from pathlib import Path
from typing import List, Optional, Union, IO, Dict, Any
from .abstract import AbstractMISP, MISPTag
from .exceptions import UnknownMISPObjectTemplate, InvalidMISPObject, PyMISPError, NewEventError, NewAttributeError, NewEventReportError, NewGalaxyClusterError, NewGalaxyClusterRelationError
from .exceptions import (UnknownMISPObjectTemplate, InvalidMISPGalaxy, InvalidMISPObject,
PyMISPError, NewEventError, NewAttributeError, NewEventReportError,
NewGalaxyClusterError, NewGalaxyClusterRelationError)
logger = logging.getLogger('pymisp')
@ -109,7 +111,7 @@ class MISPOrganisation(AbstractMISP):
_fields_for_feed: set = {'name', 'uuid'}
def __init__(self):
def __init__(self) -> None:
super().__init__()
self.id: int
self.name: str
@ -128,7 +130,7 @@ class MISPOrganisation(AbstractMISP):
class MISPSharingGroupOrg(AbstractMISP):
_fields_for_feed: set = {'extend', 'Organisation'}
def __init__(self):
def __init__(self) -> None:
super().__init__()
self.extend: bool
self.Organisation: MISPOrganisation
@ -155,7 +157,7 @@ class MISPSharingGroupOrg(AbstractMISP):
class MISPSharingGroup(AbstractMISP):
_fields_for_feed: set = {'uuid', 'name', 'roaming', 'created', 'organisation_uuid', 'Organisation', 'SharingGroupOrg', 'SharingGroupServer'}
def __init__(self):
def __init__(self) -> None:
super().__init__()
self.name: str
self.SharingGroupOrg: List[MISPSharingGroupOrg] = []
@ -203,7 +205,7 @@ class MISPSharingGroup(AbstractMISP):
class MISPShadowAttribute(AbstractMISP):
def __init__(self):
def __init__(self) -> None:
super().__init__()
self.type: str
self.value: str
@ -221,7 +223,7 @@ class MISPShadowAttribute(AbstractMISP):
class MISPSighting(AbstractMISP):
def __init__(self):
def __init__(self) -> None:
super().__init__()
self.id: int
self.value: str
@ -277,6 +279,7 @@ class MISPAttribute(AbstractMISP):
self.SharingGroup: MISPSharingGroup
self.Sighting: List[MISPSighting] = []
self.Tag: List[MISPTag] = []
self.Galaxy: List[MISPGalaxy] = []
# For search
self.Event: MISPEvent
@ -298,6 +301,27 @@ class MISPAttribute(AbstractMISP):
"""Set a list of prepared MISPTag."""
super()._set_tags(tags)
def add_galaxy(self, galaxy: Union['MISPGalaxy', dict, None] = None, **kwargs) -> 'MISPGalaxy':
"""Add a galaxy to the Attribute, either by passing a MISPGalaxy or a dictionary"""
if isinstance(galaxy, MISPGalaxy):
self.galaxies.append(galaxy)
return galaxy
if isinstance(galaxy, dict):
misp_galaxy = MISPGalaxy()
misp_galaxy.from_dict(**galaxy)
elif kwargs:
misp_galaxy = MISPGalaxy()
misp_galaxy.from_dict(**kwargs)
else:
raise InvalidMISPGalaxy("A Galaxy to add to an existing Attribute needs to be either a MISPGalaxy or a plain python dictionary")
self.galaxies.append(misp_galaxy)
return misp_galaxy
@property
def galaxies(self) -> List['MISPGalaxy']:
"""Returns a list of galaxies associated to this Attribute"""
return self.Galaxy
def _prepare_data(self, data: Optional[Union[Path, str, bytes, BytesIO]]):
if not data:
super().__setattr__('data', None)
@ -588,6 +612,8 @@ class MISPAttribute(AbstractMISP):
if kwargs.get('Tag'):
[self.add_tag(tag) for tag in kwargs.pop('Tag')]
if kwargs.get('Galaxy'):
[self.add_galaxy(galaxy) for galaxy in kwargs.pop('Galaxy')]
if kwargs.get('Sighting'):
[self.add_sighting(sighting) for sighting in kwargs.pop('Sighting')]
if kwargs.get('ShadowAttribute'):
@ -678,7 +704,7 @@ class MISPObjectReference(AbstractMISP):
_fields_for_feed: set = {'uuid', 'timestamp', 'relationship_type', 'comment',
'object_uuid', 'referenced_uuid'}
def __init__(self):
def __init__(self) -> None:
super().__init__()
self.uuid = str(uuid.uuid4())
self.object_uuid: str
@ -1202,15 +1228,10 @@ class MISPGalaxyClusterRelation(AbstractMISP):
Creating a new galaxy cluster can take the following parameters
:param galaxy_cluster_uuid: The UUID of the galaxy the relation links to
:type galaxy_cluster_uuid: uuid
:param referenced_galaxy_cluster_type: The relation type, e.g. dropped-by
:type referenced_galaxy_cluster_type: str
:param referenced_galaxy_cluster_uuid: The UUID of the related galaxy
:type referenced_galaxy_cluster_uuid: uuid
:param distribution: The distribution of the relation, one of 0, 1, 2, 3, 4, default 0
:type distribution: int
:param sharing_group_id: The sharing group of the relation, only when distribution is 4
:type sharing_group_id: int, optional
"""
def __repr__(self) -> str:
@ -1218,10 +1239,10 @@ class MISPGalaxyClusterRelation(AbstractMISP):
return '<{self.__class__.__name__}(referenced_galaxy_cluster_type={self.referenced_galaxy_cluster_type})'.format(self=self)
return '<{self.__class__.__name__}(NotInitialized)'.format(self=self)
def __init__(self):
def __init__(self) -> None:
super().__init__()
self.galaxy_cluster_uuid: uuid
self.referenced_galaxy_cluster_uuid: uuid
self.galaxy_cluster_uuid: str
self.referenced_galaxy_cluster_uuid: str
self.distribution: int = 0
self.referenced_galaxy_cluster_type: str
self.Tag: List[MISPTag] = []
@ -1298,11 +1319,11 @@ class MISPGalaxyCluster(AbstractMISP):
:type cluster_relations: list[MISPGalaxyClusterRelation], optional
"""
def __init__(self):
def __init__(self) -> None:
super().__init__()
self.Galaxy: MISPGalaxy
self.GalaxyElement: List[MISPGalaxyClusterElement] = []
self.meta = {}
self.meta: Dict = {}
self.GalaxyClusterRelation: List[MISPGalaxyClusterRelation] = []
self.Org: MISPOrganisation
self.Orgc: MISPOrganisation
@ -1411,7 +1432,7 @@ class MISPGalaxyCluster(AbstractMISP):
self.cluster_elements.append(cluster_element)
return cluster_element
def add_cluster_relation(self, referenced_galaxy_cluster_uuid: Union["MISPGalaxyCluster", str, UUID], referenced_galaxy_cluster_type: str, galaxy_cluster_uuid: str = None, **kwargs) -> MISPGalaxyClusterRelation:
def add_cluster_relation(self, referenced_galaxy_cluster_uuid: Union["MISPGalaxyCluster", str, UUID], referenced_galaxy_cluster_type: str, galaxy_cluster_uuid: Optional[str] = None, **kwargs: Dict) -> MISPGalaxyClusterRelation:
"""Add a cluster relation to a MISPGalaxyCluster.
:param referenced_galaxy_cluster_uuid: UUID of the related cluster
@ -1447,7 +1468,7 @@ class MISPGalaxyCluster(AbstractMISP):
class MISPGalaxy(AbstractMISP):
"""Galaxy class, used to view a galaxy and respective clusters"""
def __init__(self):
def __init__(self) -> None:
super().__init__()
self.GalaxyCluster: List[MISPGalaxyCluster] = []
self.name: str
@ -1950,13 +1971,23 @@ class MISPEvent(AbstractMISP):
self.edited = True
return event_report
def add_galaxy(self, **kwargs) -> MISPGalaxy:
"""Add a MISP galaxy and sub-clusters into an event.
def add_galaxy(self, galaxy: Union[MISPGalaxy, dict, None] = None, **kwargs) -> MISPGalaxy:
"""Add a galaxy and sub-clusters into an event, either by passing
a MISPGalaxy or a dictionary.
Supports all other parameters supported by MISPGalaxy"""
galaxy = MISPGalaxy()
galaxy.from_dict(**kwargs)
self.galaxies.append(galaxy)
return galaxy
if isinstance(galaxy, MISPGalaxy):
self.galaxies.append(galaxy)
return galaxy
if isinstance(galaxy, dict):
misp_galaxy = MISPGalaxy()
misp_galaxy.from_dict(**galaxy)
elif kwargs:
misp_galaxy = MISPGalaxy()
misp_galaxy.from_dict(**kwargs)
else:
raise InvalidMISPGalaxy("A Galaxy to add to an existing Event needs to be either a MISPGalaxy or a plain python dictionary")
self.galaxies.append(misp_galaxy)
return misp_galaxy
def get_object_by_id(self, object_id: Union[str, int]) -> MISPObject:
"""Get an object by ID
@ -2122,7 +2153,7 @@ class MISPObjectTemplate(AbstractMISP):
class MISPUser(AbstractMISP):
def __init__(self, **kwargs):
def __init__(self, **kwargs: Dict) -> None:
super().__init__(**kwargs)
self.email: str
@ -2193,7 +2224,7 @@ class MISPCorrelationExclusion(AbstractMISP):
class MISPRole(AbstractMISP):
def __init__(self, **kwargs):
def __init__(self, **kwargs: Dict) -> None:
super().__init__(**kwargs)
self.perm_admin: int
self.perm_site_admin: int
@ -2214,7 +2245,7 @@ class MISPServer(AbstractMISP):
class MISPLog(AbstractMISP):
def __init__(self, **kwargs):
def __init__(self, **kwargs: Dict) -> None:
super().__init__(**kwargs)
self.model: str
self.action: str
@ -2231,7 +2262,7 @@ class MISPLog(AbstractMISP):
class MISPEventDelegation(AbstractMISP):
def __init__(self, **kwargs):
def __init__(self, **kwargs: Dict) -> None:
super().__init__(**kwargs)
self.org_id: int
self.requester_org_id: int
@ -2313,7 +2344,7 @@ class MISPUserSetting(AbstractMISP):
class MISPInbox(AbstractMISP):
def __init__(self, **kwargs):
def __init__(self, **kwargs: Dict) -> None:
super().__init__(**kwargs)
self.data: Dict
@ -2328,7 +2359,7 @@ class MISPInbox(AbstractMISP):
class MISPEventBlocklist(AbstractMISP):
def __init__(self, **kwargs):
def __init__(self, **kwargs: Dict) -> None:
super().__init__(**kwargs)
self.event_uuid: str
@ -2343,7 +2374,7 @@ class MISPEventBlocklist(AbstractMISP):
class MISPOrganisationBlocklist(AbstractMISP):
def __init__(self, **kwargs):
def __init__(self, **kwargs: Dict) -> None:
super().__init__(**kwargs)
self.org_uuid: str
@ -2354,3 +2385,19 @@ class MISPOrganisationBlocklist(AbstractMISP):
def __repr__(self):
return f'<{self.__class__.__name__}(org_uuid={self.org_uuid}'
class MISPDecayingModel(AbstractMISP):
def __init__(self, **kwargs: Dict) -> None:
super().__init__(**kwargs)
self.uuid: str
self.id: int
def from_dict(self, **kwargs):
if 'DecayingModel' in kwargs:
kwargs = kwargs['DecayingModel']
super().from_dict(**kwargs)
def __repr__(self):
return f'<{self.__class__.__name__}(uuid={self.uuid})>'

View File

@ -6,7 +6,7 @@ from ..exceptions import InvalidMISPObject
from io import BytesIO
from hashlib import md5, sha1, sha256, sha512
import logging
from typing import Union
from typing import Union, Optional
from pathlib import Path
from . import FileObject
@ -32,7 +32,7 @@ def make_elf_objects(lief_parsed: lief.Binary, misp_file: FileObject, standalone
class ELFObject(AbstractMISPObjectGenerator):
def __init__(self, parsed: lief.ELF.Binary = None, filepath: Union[Path, str] = None, pseudofile: Union[BytesIO, bytes] = None, **kwargs):
def __init__(self, parsed: Optional[lief.ELF.Binary] = None, filepath: Optional[Union[Path, str]] = None, pseudofile: Optional[Union[BytesIO, bytes]] = None, **kwargs):
"""Creates an ELF object, with lief"""
super().__init__('elf', **kwargs)
if not HAS_PYDEEP:

View File

@ -9,10 +9,10 @@ from email import policy, message_from_bytes
from email.message import EmailMessage
from io import BytesIO
from pathlib import Path
from typing import Union, List, Tuple, Dict, cast, Any
from typing import Union, List, Tuple, Dict, cast, Any, Optional
from extract_msg import openMsg
from extract_msg.message import Message as MsgObj
from extract_msg import openMsg # type: ignore
from extract_msg.message import Message as MsgObj # type: ignore
from RTFDE.exceptions import MalformedEncapsulatedRtf, NotEncapsulatedRtf # type: ignore
from RTFDE.deencapsulate import DeEncapsulator # type: ignore
from oletools.common.codepages import codepage2codec # type: ignore
@ -28,7 +28,7 @@ class MISPMsgConverstionError(MISPObjectException):
class EMailObject(AbstractMISPObjectGenerator):
def __init__(self, filepath: Union[Path, str] = None, pseudofile: BytesIO = None,
def __init__(self, filepath: Optional[Union[Path, str]]=None, pseudofile: Optional[BytesIO]=None,
attach_original_email: bool = True, **kwargs):
super().__init__('email', **kwargs)
@ -80,8 +80,8 @@ class EMailObject(AbstractMISPObjectGenerator):
raise PyMISPNotImplementedYet("EmailObject does not know how to decode data passed to it. Object may not be an email. If this is an email please submit it as an issue to PyMISP so we can add support.")
@staticmethod
def create_pseudofile(filepath: Union[Path, str] = None,
pseudofile: BytesIO = None) -> BytesIO:
def create_pseudofile(filepath: Optional[Union[Path, str]] = None,
pseudofile: Optional[BytesIO] = None) -> BytesIO:
"""Creates a pseudofile using directly passed data or data loaded from file path.
"""
if filepath:
@ -111,7 +111,7 @@ class EMailObject(AbstractMISPObjectGenerator):
"cte": "base64"}
if msg_obj.htmlBody is not None:
try:
_html_encoding_raw = msg_obj.mainProperties['3FDE0003'].value
_html_encoding_raw = msg_obj.props['3FDE0003'].value
_html_encoding = codepage2codec(_html_encoding_raw)
except KeyError:
_html_encoding = msg_obj.stringEncoding

View File

@ -10,7 +10,7 @@ import math
from collections import Counter
import logging
from pathlib import Path
from typing import Union
from typing import Union, Optional
logger = logging.getLogger('pymisp')
@ -30,7 +30,7 @@ except ImportError:
class FileObject(AbstractMISPObjectGenerator):
def __init__(self, filepath: Union[Path, str] = None, pseudofile: BytesIO = None, filename: str = None, **kwargs):
def __init__(self, filepath: Optional[Union[Path, str]] = None, pseudofile: Optional[BytesIO] = None, filename: Optional[str] = None, **kwargs) -> None:
super().__init__('file', **kwargs)
if not HAS_PYDEEP:
logger.warning("Please install pydeep: pip install git+https://github.com/kbandla/pydeep.git")

View File

@ -1,6 +1,6 @@
[tool.poetry]
name = "pymisp"
version = "2.4.162.1"
version = "2.4.168"
description = "Python API for MISP."
authors = ["Raphaël Vinot <raphael.vinot@circl.lu>"]
license = "BSD-2-Clause"
@ -18,9 +18,10 @@ classifiers=[
'Intended Audience :: Science/Research',
'Intended Audience :: Telecommunications Industry',
'Intended Audience :: Information Technology',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3.11',
'Topic :: Security',
'Topic :: Internet'
]
@ -41,26 +42,25 @@ include = [
"Source" = "https://github.com/MISP/PyMISP"
[tool.poetry.dependencies]
python = "^3.7"
requests = "^2.28.1"
python = "^3.8"
requests = "^2.28.2"
python-dateutil = "^2.8.2"
jsonschema = "^4.16.0"
jsonschema = "^4.17.3"
deprecated = "^1.2.13"
extract_msg = {version = "^0.36.4", optional = true}
extract_msg = {version = "^0.39.0", optional = true}
RTFDE = {version = "^0.0.2", optional = true}
oletools = {version = "^0.60.1", optional = true}
python-magic = {version = "^0.4.27", optional = true}
pydeep2 = {version = "^0.5.1", optional = true}
lief = {version = "^0.12.2", optional = true}
lief = {version = "^0.12.3", optional = true}
beautifulsoup4 = {version = "^4.11.1", optional = true}
validators = {version = "^0.20.0", optional = true}
sphinx-autodoc-typehints = {version = "^1.19.4", optional = true}
sphinx-autodoc-typehints = {version = "^1.21.8", optional = true}
recommonmark = {version = "^0.7.1", optional = true}
reportlab = {version = "^3.6.11", optional = true}
reportlab = {version = "^3.6.12", optional = true}
pyfaup = {version = "^1.2", optional = true}
publicsuffixlist = {version = "^0.9.0", optional = true}
chardet = {version = "^5.0.0", optional = true}
urllib3 = {extras = ["brotli"], version = "^1.26.12", optional = true}
publicsuffixlist = {version = "^0.9.2", optional = true}
urllib3 = {extras = ["brotli"], version = "^1.26.14", optional = true}
[tool.poetry.extras]
fileobjects = ['python-magic', 'pydeep2', 'lief']
@ -68,18 +68,18 @@ openioc = ['beautifulsoup4']
virustotal = ['validators']
docs = ['sphinx-autodoc-typehints', 'recommonmark']
pdfexport = ['reportlab']
url = ['pyfaup', 'chardet']
url = ['pyfaup']
email = ['extract_msg', "RTFDE", "oletools"]
brotli = ['urllib3']
[tool.poetry.group.dev.dependencies]
requests-mock = "^1.10.0"
mypy = "^0.982"
ipython = "^7.34.0"
jupyterlab = "^3.4.8"
types-requests = "^2.28.11.1"
types-python-dateutil = "^2.8.19"
types-redis = "^4.3.21.1"
mypy = "^0.991"
ipython = "^8.9.0"
jupyterlab = "^3.5.3"
types-requests = "^2.28.11.8"
types-python-dateutil = "^2.8.19.6"
types-redis = "^4.4.0.4"
types-Flask = "^1.1.6"
pytest-cov = "^4.0.0"

View File

@ -0,0 +1,25 @@
{
"uuid": "c5f2dfb4-21a1-42d8-a452-1d3c36a204ff",
"name": "Tea Matrix",
"type": "tea-matrix",
"description": "Tea Matrix",
"namespace": "tea-matrix",
"GalaxyCluster": [
{
"collection_uuid": "7eacd736-b093-4cc0-a56c-5f84de725dfb",
"type": "tea-matrix",
"value": "Milk in tea",
"tag_name": "misp-galaxy:tea-matrix=\"Milk in tea\"",
"description": "Milk in tea",
"uuid": "24430dc6-9c27-4b3c-a5e7-6dda478fffa0",
"distribution": "3",
"default": true,
"meta": {
"kill_chain": [
"tea:black"
]
},
"relationship_type": "ennemy-of"
}
]
}

View File

@ -8,8 +8,8 @@ import glob
import hashlib
from datetime import date, datetime
from pymisp import (MISPEvent, MISPSighting, MISPTag, MISPOrganisation,
MISPObject)
from pymisp import (MISPAttribute, MISPEvent, MISPGalaxy, MISPObject, MISPOrganisation,
MISPSighting, MISPTag)
from pymisp.exceptions import InvalidMISPObject
from pymisp.tools import GitVulnFinderObject
@ -68,6 +68,15 @@ class TestMISPEvent(unittest.TestCase):
del self.mispevent.uuid
self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2))
def test_event_galaxy(self):
self.init_event()
with open('tests/mispevent_testfiles/galaxy.json', 'r') as f:
galaxy = json.load(f)
misp_galaxy = MISPGalaxy()
misp_galaxy.from_dict(**galaxy)
self.mispevent.add_galaxy(misp_galaxy)
self.assertEqual(self.mispevent.galaxies[0].to_json(sort_keys=True, indent=2), json.dumps(galaxy, sort_keys=True, indent=2))
def test_attribute(self):
self.init_event()
a = self.mispevent.add_attribute('filename', 'bar.exe')
@ -87,6 +96,21 @@ class TestMISPEvent(unittest.TestCase):
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2))
def test_attribute_galaxy(self):
self.init_event()
with open('tests/mispevent_testfiles/galaxy.json', 'r') as f:
galaxy = json.load(f)
misp_galaxy = MISPGalaxy()
misp_galaxy.from_dict(**galaxy)
attribute = MISPAttribute()
attribute.from_dict(**{'type': 'github-username', 'value': 'adulau'})
attribute.add_galaxy(misp_galaxy)
self.mispevent.add_attribute(**attribute)
self.assertEqual(
self.mispevent.attributes[0].galaxies[0].to_json(sort_keys=True, indent=2),
json.dumps(galaxy, sort_keys=True, indent=2)
)
def test_to_dict_json_format(self):
misp_event = MISPEvent()
av_signature_object = MISPObject("av-signature")
@ -130,6 +154,22 @@ class TestMISPEvent(unittest.TestCase):
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2))
def test_object_galaxy(self):
self.init_event()
misp_object = MISPObject('github-user')
misp_object.add_attribute('username', 'adulau')
misp_object.add_attribute('repository', 'cve-search')
self.mispevent.add_object(misp_object)
with open('tests/mispevent_testfiles/galaxy.json', 'r') as f:
galaxy = json.load(f)
misp_galaxy = MISPGalaxy()
misp_galaxy.from_dict(**galaxy)
self.mispevent.objects[0].attributes[0].add_galaxy(misp_galaxy)
self.assertEqual(
self.mispevent.objects[0].attributes[0].galaxies[0].to_json(sort_keys=True, indent=2),
json.dumps(galaxy, sort_keys=True, indent=2)
)
def test_malware(self):
with open('tests/mispevent_testfiles/simple.json', 'rb') as f:
pseudofile = BytesIO(f.read())

View File

@ -679,6 +679,31 @@ class TestComprehensive(unittest.TestCase):
self.admin_misp_connector.delete_event(first)
self.admin_misp_connector.delete_event(second)
def test_search_decay(self):
# Creating event 1
first = self.create_simple_event()
first.add_attribute('ip-dst', '8.8.8.8')
first.publish()
try:
r = self.admin_misp_connector.update_decaying_models()
self.assertTrue(r['success'], r)
simple_decaying_model = None
models = self.admin_misp_connector.decaying_models(pythonify=True)
for model in models:
if model.name == 'NIDS Simple Decaying Model':
simple_decaying_model = model
self.assertTrue(simple_decaying_model, models)
self.admin_misp_connector.enable_decaying_model(simple_decaying_model)
# TODO: check the response, it is curently an empty list
first = self.pub_misp_connector.add_event(first, pythonify=True)
result = self.pub_misp_connector.search('attributes', to_ids=1, includeDecayScore=True, pythonify=True)
self.assertTrue(result[0].decay_score, result[0].to_json(indent=2))
self.admin_misp_connector.disable_decaying_model(simple_decaying_model)
# TODO: check the response, it is curently a list of all the models
finally:
# Delete event
self.admin_misp_connector.delete_event(first)
def test_default_distribution(self):
'''The default distributions on the VM are This community only for the events and Inherit from event for attr/obj)'''
first = self.create_simple_event()
@ -1262,8 +1287,14 @@ class TestComprehensive(unittest.TestCase):
self.assertTrue('successfully' in r['message'].lower() and f'({second.id})' in r['message'], r['message'])
second = self.user_misp_connector.get_event(second.id, pythonify=True)
self.assertTrue('generic_tag_test' == second.tags[0].name)
# # Test local tag, shouldn't update the timestamp
old_ts = second.timestamp
r = self.admin_misp_connector.tag(second, 'generic_tag_test_local', local=True)
second = self.user_misp_connector.get_event(second.id, pythonify=True)
self.assertEqual(old_ts, second.timestamp)
r = self.admin_misp_connector.untag(second, 'generic_tag_test')
r = self.admin_misp_connector.untag(second, 'generic_tag_test_local')
self.assertTrue(r['message'].endswith(f'successfully removed from Event({second.id}).'), r['message'])
second = self.user_misp_connector.get_event(second.id, pythonify=True)
self.assertFalse(second.tags)