# To get started with Dependabot version updates, you'll need to specify which
# package ecosystems to update and where the package manifests are located.
# Please see the documentation for all configuration options:
# https://help.github.com/github/administering-a-repository/configuration-options-for-dependency-updates
version: 2
- package-ecosystem: "pip"
directory: "/"
interval: "daily"
- package-ecosystem: "github-actions"
directory: "/"
# Check for updates to GitHub Actions every weekday
interval: "daily"
# For most projects, this workflow file will not need changing; you simply need
# to commit it to your repository.
# You may wish to alter this file to override the set of languages analyzed,
# or to provide custom queries or build logic.
# ******** NOTE ********
# We have attempted to detect the languages in your repository. Please check
# the `language` matrix defined below to confirm you have the correct set of
# supported CodeQL languages.
name: "CodeQL"
branches: [ "main" ]
# The branches below must be a subset of the branches above
branches: [ "main" ]
- cron: '21 10 * * 1'
name: Analyze
runs-on: ubuntu-latest
actions: read
contents: read
security-events: write
fail-fast: false
language: [ 'python' ]
# CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python', 'ruby' ]
# Learn more about CodeQL language support at https://aka.ms/codeql-docs/language-support
- name: Checkout repository
uses: actions/checkout@v3
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@v2
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
# By default, queries listed here will override any specified in a config file.
# Prefix the list here with "+" to use these queries and those in the config file.
# Details on CodeQL's query packs refer to : https://docs.github.com/en/code-security/code-scanning/automatically-scanning-your-code-for-vulnerabilities-and-errors/configuring-code-scanning#using-queries-in-ql-packs
# queries: security-extended,security-and-quality
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
uses: github/codeql-action/autobuild@v2
# ℹ️ Command-line programs to run using the OS shell.
# 📚 See https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions#jobsjob_idstepsrun
# If the Autobuild fails above, remove it and uncomment the following three lines.
# modify them (or add more) to build your code if your project, please refer to the EXAMPLE below for guidance.
# - run: |
# echo "Run, Build Application using script"
# ./location_of_script_within_repo/buildscript.sh
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v2
category: "/language:${{matrix.language}}"
- uses: actions/checkout@v2
- uses: actions/checkout@v3
submodules: recursive
- name: Set up Python ${{matrix.python-version}}
uses: actions/setup-python@v2
uses: actions/setup-python@v4
python-version: ${{matrix.python-version}}
poetry run mypy tests/testlive_comprehensive.py tests/test_mispevent.py tests/testlive_sync.py pymisp
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v1
uses: codecov/codecov-action@v3
v2.4.162.1 (2022-10-02)
- Bump deps and version. [Raphaël Vinot]
Fix LIEF vuln.
- Bump deps, objects. [Raphaël Vinot]
- Change DNS warning list test. [Raphaël Vinot]
v2.4.162 (2022-09-09)
- Pass arbitrary headers to a PyMISP request. [Raphaël Vinot]
- Allow to force the timestamps in to_dict/to_json, even if a change was
made. [Raphaël Vinot]
- Bump changelog. [Raphaël Vinot]
- Bump version. [Raphaël Vinot]
- Bump deps. [Raphaël Vinot]
- Add in sort/desc for sorting results and limit/page for pagination.
[Tom King]
- Improve documentation for add_attribute. [Raphaël Vinot]
- Missing place to update version. [Raphaël Vinot]
v2.4.160.1 (2022-08-09)
- Bump changelog. [Raphaël Vinot]
- Bump deps. [Raphaël Vinot]
- Make keepalive configuration linux only. [Raphaël Vinot]
Bump deps
v2.4.160 (2022-08-05)
- Enable TCP keepalive. [Raphaël Vinot]
- Bump deps. [Raphaël Vinot]
- Bump version, deps. [Raphaël Vinot]
- Improve warning on invalid template, bump deps. [Raphaël Vinot]
- Bump deps. [Raphaël Vinot]
- Make mypy happy. [Raphaël Vinot]
- Bump deps. [Raphaël Vinot]
- Bump deps. [Raphaël Vinot]
- Bump deps. [Raphaël Vinot]
- Add in test case. [Tom King]
- Add ability to filter by sharing group for RestSearch for MISP >=
v2.4.158. [Tom King]
- Delete sharing group after deleting the event. [Raphaël Vinot]
- Give more time to MISP to publish the events before searching.
[Raphaël Vinot]
- Improper json check on non-json responses. [Raphaël Vinot]
Fix #854
- Mark all attributes in a soft deleted object as soft deleted too.
[Raphaël Vinot]
Bump misp-objects, deps.
- Make flake8 happy. [Raphaël Vinot]
- Properly convert MSG to EML. [Raphaël Vinot]
- Update lock file. [Raphaël Vinot]
- [feed] fixes bug when template_uuid does not exist. [Christophe
- Update api.py. [Derekt2]
- Fix typo in logging message. [Philipp Hauswirth]
- Fig: [feed] fixes bugs during export with old data. [Christophe
- Update pyproject.toml. [Steven]
Add publicsuffixlist optional package for URL Object, which has a more current list than pyfaup
- Fix multiple_space warning. [malvidin]
- Option to include more URLObject attributes Add publicsuffixlist faup
for URLObject Windows support URLObject with PSLFaup prefers IP to
host/domain. [malvidin]
- Ensure that keys are sorted in the returned `_to_feed()` dictionary.
[Yun Zheng Hu]
This allows for better deterministic feed output generation.
v2.4.159 (2022-05-30)
- [example:copyTagsFromAttributesToEvent] Added script to copy tags from
attributes to the event level. [Sami Mokaddem]
- Bump version. [Raphaël Vinot]
- Bump deps. [Raphaël Vinot]
- Massive bump deps for python 3.7. [Raphaël Vinot]
v2.4.157 (2022-03-24)
- Bump object templates. [Raphaël Vinot]
- Bump changelog. [Raphaël Vinot]
- Bump changelog. [Raphaël Vinot]
- Bump version. [Raphaël Vinot]
- Bump deps, objects. [Raphaël Vinot]
__version__ = '2.4.159'
__version__ = ''
import logging
import sys
import warnings
@ -280,6 +280,14 @@ class AbstractMISP(MutableMapping, MISPFileCache, metaclass=ABCMeta):
def __len__(self) -> int:
return len([k for k in self.__dict__.keys() if not (k[0] == '_' or k in self.__not_jsonable)])
def force_timestamp(self) -> bool:
return self.__force_timestamps
def force_timestamp(self, force: bool):
self.__force_timestamps = force
def edited(self) -> bool:
"""Recursively check if an object has been edited and update the flag accordingly
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import TypeVar, Optional, Tuple, List, Dict, Union, Any, Mapping, Iterable
from typing import TypeVar, Optional, Tuple, List, Dict, Union, Any, Mapping, Iterable, MutableMapping
from datetime import date, datetime
import csv
from pathlib import Path
@ -28,6 +28,18 @@ from .mispevent import MISPEvent, MISPAttribute, MISPSighting, MISPLog, MISPObje
MISPGalaxyCluster, MISPGalaxyClusterRelation, MISPCorrelationExclusion
from .abstract import pymisp_json_default, MISPTag, AbstractMISP, describe_types
if sys.platform == 'linux':
# Enable TCP keepalive by default on every requests
import socket
from urllib3.connection import HTTPConnection
HTTPConnection.default_socket_options = HTTPConnection.default_socket_options + [
(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1), # enable keepalive
(socket.SOL_TCP, socket.TCP_KEEPIDLE, 30), # Start pinging after 30s of idle time
(socket.SOL_TCP, socket.TCP_KEEPINTVL, 10), # ping every 10s
(socket.SOL_TCP, socket.TCP_KEEPCNT, 6) # kill the connection if 6 ping fail (60s total)
# cached_property exists since Python 3.8
from functools import cached_property # type: ignore
@ -138,11 +150,16 @@ class PyMISP:
:param cert: Client certificate, as described here: http://docs.python-requests.org/en/master/user/advanced/#client-side-certificates
:param auth: The auth parameter is passed directly to requests, as described here: http://docs.python-requests.org/en/master/user/authentication/
:param tool: The software using PyMISP (string), used to set a unique user-agent
:param http_headers: Arbitrary headers to pass to all the requests.
:param timeout: Timeout, as described here: https://requests.readthedocs.io/en/master/user/advanced/#timeouts
def __init__(self, url: str, key: str, ssl: bool = True, debug: bool = False, proxies: Mapping = {},
cert: Tuple[str, tuple] = None, auth: AuthBase = None, tool: str = '', timeout: Optional[Union[float, Tuple[float, float]]] = None):
def __init__(self, url: str, key: str, ssl: bool = True, debug: bool = False, proxies: Optional[MutableMapping[str, str]] = None,
cert: Optional[Union[str, Tuple[str, str]]] = None, auth: AuthBase = None, tool: str = '',
timeout: Optional[Union[float, Tuple[float, float]]] = None,
http_headers: Optional[Dict[str, str]]=None
if not url:
raise NoURL('Please provide the URL of your MISP instance.')
if not key:
@ -151,14 +168,16 @@ class PyMISP:
self.root_url: str = url
self.key: str = key
self.ssl: bool = ssl
self.proxies: Mapping[str, str] = proxies
self.cert: Optional[Tuple[str, tuple]] = cert
self.proxies: Optional[MutableMapping[str, str]] = proxies
self.cert: Optional[Union[str, Tuple[str, str]]] = cert
self.auth: Optional[AuthBase] = auth
self.tool: str = tool
self.timeout: Optional[Union[float, Tuple[float, float]]] = timeout
self.__session = requests.Session() # use one session to keep connection between requests
if brotli_supported():
self.__session.headers['Accept-Encoding'] = ', '.join(('br', 'gzip', 'deflate'))
if http_headers:
self.global_pythonify = False
@ -176,7 +195,7 @@ class PyMISP:
pymisp_version_tup = tuple(int(x) for x in __version__.split('.'))
recommended_version_tup = tuple(int(x) for x in response['version'].split('.'))
if recommended_version_tup < pymisp_version_tup[:3]:
logger.info(f"The version of PyMISP recommended by the MISP instance (response['version']) is older than the one you're using now ({__version__}). If you have a problem, please upgrade the MISP instance or use an older PyMISP version.")
logger.info(f"The version of PyMISP recommended by the MISP instance ({response['version']}) is older than the one you're using now ({__version__}). If you have a problem, please upgrade the MISP instance or use an older PyMISP version.")
elif pymisp_version_tup[:3] < recommended_version_tup:
logger.warning(f"The version of PyMISP recommended by the MISP instance ({response['version']}) is newer than the one you're using now ({__version__}). Please upgrade PyMISP.")
@ -1172,6 +1191,17 @@ class PyMISP:
response = self._prepare_request('POST', 'taxonomies/update')
return self._check_json_response(response)
def set_taxonomy_required(self, taxonomy: Union[MISPTaxonomy, int, str], required: bool = False) -> Dict:
taxonomy_id = get_uuid_or_id_from_abstract_misp(taxonomy)
url = urljoin(self.root_url, 'taxonomies/toggleRequired/{}'.format(taxonomy_id))
payload = {
"Taxonomy": {
"required": required
response = self._prepare_request('POST', url, data=payload)
return self._check_json_response(response)
# ## END Taxonomies ###
# ## BEGIN Warninglists ###
@ -2569,7 +2599,7 @@ class PyMISP:
return self._csv_to_dict(normalized_response_text) # type: ignore
return normalized_response_text
elif return_format in ['stix-xml', 'text']:
elif return_format not in ['json', 'yara-json']:
return self._check_response(response)
normalized_response = self._check_json_response(response)
@ -2647,6 +2677,10 @@ class PyMISP:
]] = None,
sharinggroup: Optional[List[SearchType]] = None,
minimal: Optional[bool] = None,
sort: Optional[str] = None,
desc: Optional[bool] = None,
limit: Optional[int] = None,
page: Optional[int] = None,
pythonify: Optional[bool] = None) -> Union[Dict, List[MISPEvent]]:
"""Search event metadata shown on the event index page. Using ! in front of a value
means NOT, except for parameters date_from, date_to and timestamp which cannot be negated.
@ -2678,6 +2712,10 @@ class PyMISP:
:param publish_timestamp: Filter on event's publish timestamp.
:param sharinggroup: Restrict by a sharing group | list
:param minimal: Return only event ID, UUID, timestamp, sighting_timestamp and published.
:param sort: The field to sort the events by, such as 'id', 'date', 'attribute_count'.
:param desc: Whether to sort events ascending (default) or descending.
:param limit: Limit the number of events returned
:param page: If a limit is set, sets the page to be returned. page 3, limit 100 will return records 201->300).
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output.
Warning: it might use a lot of RAM
@ -2696,7 +2734,8 @@ class PyMISP:
query['timestamp'] = (self._make_timestamp(timestamp[0]), self._make_timestamp(timestamp[1]))
query['timestamp'] = self._make_timestamp(timestamp)
if query.get("sort"):
query["direction"] = "desc" if desc else "asc"
url = urljoin(self.root_url, 'events/index')
response = self._prepare_request('POST', url, data=query)
normalized_response = self._check_json_response(response)
@ -3514,7 +3553,8 @@ class PyMISP:
def _check_response(self, response: requests.Response, lenient_response_type: bool = False, expect_json: bool = False) -> Union[Dict, str]:
"""Check if the response from the server is not an unexpected error"""
if response.status_code >= 500:
logger.critical(everything_broken.format(response.request.headers, response.request.body, response.text))
headers_without_auth = {i: response.request.headers[i] for i in response.request.headers if i != 'Authorization'}
logger.critical(everything_broken.format(headers_without_auth, response.request.body, response.text))
raise MISPServerError(f'Error code 500:\n{response.text}')
if 400 <= response.status_code < 500:
@ -3575,7 +3615,6 @@ class PyMISP:
# CakePHP params in URL
to_append_url = '/'.join([f'{k}:{v}' for k, v in kw_params.items()])
url = f'{url}/{to_append_url}'
req = requests.Request(request_type, url, data=d, params=params)
user_agent = f'PyMISP {__version__} - Python {".".join(str(x) for x in sys.version_info[:2])}'
if self.tool:
Subproject commit db9d79b093d77e09ba1dcec36cfefc00379bf73c
Subproject commit 06df3688900a24a43e101d39919d7a2c29d351ca
@ -793,6 +793,12 @@ class MISPObject(AbstractMISP):
def _to_feed(self, with_distribution=False) -> Dict:
if with_distribution:
if not hasattr(self, 'template_uuid'): # workaround for old events where the template_uuid was not yet mandatory
self.template_uuid = str(uuid.uuid5(uuid.UUID("9319371e-2504-4128-8410-3741cebbcfd3"), self.name))
if not hasattr(self, 'description'): # workaround for old events where description is not always set
self.description = '<unknown>'
if not hasattr(self, 'meta-category'): # workaround for old events where meta-category is not always set
setattr(self, 'meta-category', 'misc')
to_return = super(MISPObject, self)._to_feed()
if self.references:
to_return['ObjectReference'] = [reference._to_feed() for reference in self.references]
@ -843,6 +849,7 @@ class MISPObject(AbstractMISP):
def delete(self):
"""Mark the object as deleted (soft delete)"""
self.deleted = True
[a.delete() for a in self.attributes]
def disable_validation(self):
@ -995,8 +1002,16 @@ class MISPObject(AbstractMISP):
return all(relation in self._fast_attribute_access for relation in list_of_relations)
def add_attribute(self, object_relation: str, simple_value: Optional[Union[str, int, float]] = None, **value) -> Optional[MISPAttribute]:
"""Add an attribute. object_relation is required and the value key is a
dictionary with all the keys supported by MISPAttribute"""
"""Add an attribute.
:param object_relation: The object relation of the attribute you're adding to the object
:param simple_value: The value
:param value: dictionary with all the keys supported by MISPAttribute
Note: as long as PyMISP knows about the object template, only the object_relation and the simple_value are required.
If PyMISP doesn't know the template, you also **must** pass a type.
All the other options that can be passed along when creating an attribute (comment, IDS flag, ...)
will be either taked out of the template, or out of the default setting for the type as defined on the MISP instance.
if simple_value is not None: # /!\ The value *can* be 0
value['value'] = simple_value
if value.get('value') is None:
@ -1023,7 +1038,7 @@ class MISPObject(AbstractMISP):
attribute = MISPObjectAttribute(self._definition['attributes'][object_relation])
# Woopsie, this object_relation is unknown, no sane defaults for you.
logger.warning("The template ({}) doesn't have the object_relation ({}) you're trying to add.".format(self.name, object_relation))
logger.warning("The template ({}) doesn't have the object_relation ({}) you're trying to add. If you are creating a new event to push to MISP, please review your code so it matches the template.".format(self.name, object_relation))
attribute = MISPObjectAttribute({})
attribute = MISPObjectAttribute({})
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import ipaddress
import socket
import idna
from publicsuffixlist import PublicSuffixList # type: ignore
from urllib.parse import urlparse, urlunparse
class UrlNotDecoded(Exception):
class PSLFaup(object):
Fake Faup Python Library using PSL for Windows support
def __init__(self):
self.decoded = False
self.psl = PublicSuffixList()
self._url = None
self._retval = {}
self.ip_as_host = False
def _clear(self):
self.decoded = False
self._url = None
self._retval = {}
self.ip_as_host = False
def decode(self, url) -> None:
This function creates a dict of all the url fields.
:param url: The URL to normalize
if isinstance(url, bytes) and b'//' not in url[:10]:
url = b'//' + url
elif '//' not in url[:10]:
url = '//' + url
self._url = urlparse(url)
self.ip_as_host = False
hostname = _ensure_str(self._url.hostname)
ipv4_bytes = socket.inet_aton(_ensure_str(hostname))
ipv4 = ipaddress.IPv4Address(ipv4_bytes)
self.ip_as_host = ipv4.compressed
except (OSError, ValueError):
addr, _, _ = hostname.partition('%')
ipv6 = ipaddress.IPv6Address(addr)
self.ip_as_host = ipv6.compressed
except ValueError:
self.decoded = True
self._retval = {}
def url(self):
if not self.decoded:
raise UrlNotDecoded("You must call faup.decode() first")
netloc = self.get_host() + ('' if self.get_port() is None else ':{}'.format(self.get_port()))
return _ensure_bytes(
(self.get_scheme(), netloc, self.get_resource_path(),
'', self.get_query_string(), self.get_fragment(),)
def get_scheme(self):
Get the scheme of the url given in the decode function
:returns: The URL scheme
if not self.decoded:
raise UrlNotDecoded("You must call faup.decode() first")
return _ensure_str(self._url.scheme)
def get_credential(self):
if not self.decoded:
raise UrlNotDecoded("You must call faup.decode() first")
if self._url.password:
return _ensure_str(self._url.username) + ':' + _ensure_str(self._url.password)
if self._url.username:
return _ensure_str(self._url.username)
def get_subdomain(self):
if not self.decoded:
raise UrlNotDecoded("You must call faup.decode() first")
if self.get_host() is not None and not self.ip_as_host:
if self.get_domain() in self.get_host():
return self.get_host().rsplit(self.get_domain(), 1)[0].rstrip('.') or None
def get_domain(self):
if not self.decoded:
raise UrlNotDecoded("You must call faup.decode() first")
if self.get_host() is not None and not self.ip_as_host:
return self.psl.privatesuffix(self.get_host())
def get_domain_without_tld(self):
if not self.decoded:
raise UrlNotDecoded("You must call faup.decode() first")
if self.get_tld() is not None and not self.ip_as_host:
return self.get_domain().rsplit(self.get_tld(), 1)[0].rstrip('.')
def get_host(self):
if not self.decoded:
raise UrlNotDecoded("You must call faup.decode() first")
if self._url.hostname is None:
return None
elif self._url.hostname.isascii():
return _ensure_str(self._url.hostname)
return _ensure_str(idna.encode(self._url.hostname, uts46=True))
def get_unicode_host(self):
if not self.decoded:
raise UrlNotDecoded("You must call faup.decode() first")
if not self.ip_as_host:
return idna.decode(self.get_host(), uts46=True)
def get_tld(self):
if not self.decoded:
raise UrlNotDecoded("You must call faup.decode() first")
if self.get_host() is not None and not self.ip_as_host:
return self.psl.publicsuffix(self.get_host())
def get_port(self):
if not self.decoded:
raise UrlNotDecoded("You must call faup.decode() first")
return self._url.port
def get_resource_path(self):
if not self.decoded:
raise UrlNotDecoded("You must call faup.decode() first")
return _ensure_str(self._url.path)
def get_query_string(self):
if not self.decoded:
raise UrlNotDecoded("You must call faup.decode() first")
return _ensure_str(self._url.query)
def get_fragment(self):
if not self.decoded:
raise UrlNotDecoded("You must call faup.decode() first")
return _ensure_str(self._url.fragment)
def get(self):
self._retval["scheme"] = self.get_scheme()
self._retval["tld"] = self.get_tld()
self._retval["domain"] = self.get_domain()
self._retval["domain_without_tld"] = self.get_domain_without_tld()
self._retval["subdomain"] = self.get_subdomain()
self._retval["host"] = self.get_host()
self._retval["port"] = self.get_port()
self._retval["resource_path"] = self.get_resource_path()
self._retval["query_string"] = self.get_query_string()
self._retval["fragment"] = self.get_fragment()
self._retval["url"] = self.url
return self._retval
def _ensure_bytes(binary) -> bytes:
if isinstance(binary, bytes):
return binary
return binary.encode('utf-8')
def _ensure_str(string) -> str:
if isinstance(string, str):
return string
return string.decode('utf-8')
import ipaddress
import email.utils
from email import policy, message_from_bytes
from email.utils import parsedate_to_datetime
from email.message import EmailMessage
from io import BytesIO
from pathlib import Path
from typing import Union, List, Tuple, Dict, cast
from typing import Union, List, Tuple, Dict, cast, Any
from extract_msg import openMsg # type: ignore
from extract_msg.message import Message as MsgObj # type: ignore
from extract_msg import openMsg
from extract_msg.message import Message as MsgObj
from RTFDE.exceptions import MalformedEncapsulatedRtf, NotEncapsulatedRtf # type: ignore
from RTFDE.deencapsulate import DeEncapsulator # type: ignore
from oletools.common.codepages import codepage2codec # type: ignore
@ -101,10 +100,9 @@ class EMailObject(AbstractMISPObjectGenerator):
eml = self._build_eml(message, body, attachments)
return eml
def _extract_msg_objects(self, msg_obj: MsgObj):
def _extract_msg_objects(self, msg_obj: MsgObj) -> Tuple[EmailMessage, Dict, List[Any]]:
"""Extracts email objects needed to construct an eml from a msg."""
original_eml_header = msg_obj._getStringStream('__substg1.0_007D')
message = email.message_from_string(original_eml_header, policy=policy.default)
message: EmailMessage = email.message_from_string(msg_obj.header.as_string(), policy=policy.default) # type: ignore
body = {}
if msg_obj.body is not None:
body['text'] = {"obj": msg_obj.body,
@ -276,9 +274,8 @@ class EMailObject(AbstractMISPObjectGenerator):
if headers:
self.add_attribute("header", "\n".join(headers))
if "Date" in message:
if "Date" in message and message.get('date').datetime is not None:
self.add_attribute("send-date", message.get('date').datetime)
if "To" in message:
self.__add_emails("to", message["To"])
from .abstractgenerator import AbstractMISPObjectGenerator
import logging
from pyfaup.faup import Faup # type: ignore
from urllib.parse import unquote_plus
from pyfaup.faup import Faup # type: ignore
except (OSError, ImportError):
from ._psl_faup import PSLFaup as Faup
logger = logging.getLogger('pymisp')
faup = Faup()
@ -13,8 +17,9 @@ faup = Faup()
class URLObject(AbstractMISPObjectGenerator):
def __init__(self, url: str, **kwargs):
def __init__(self, url: str, generate_all=False, **kwargs):
super().__init__('url', **kwargs)
self._generate_all = True if generate_all is True else False
@ -24,3 +29,28 @@ class URLObject(AbstractMISPObjectGenerator):
self.add_attribute('host', value=faup.get_host())
if faup.get_domain():
self.add_attribute('domain', value=faup.get_domain())
if self._generate_all:
if hasattr(faup, 'ip_as_host') and faup.ip_as_host:
self.attributes = [attr for attr in self.attributes
if attr.object_relation not in ('host', 'domain')]
self.add_attribute('ip', value=faup.ip_as_host)
if faup.get_credential():
self.add_attribute('credential', value=faup.get_credential())
if faup.get_fragment():
self.add_attribute('fragment', value=faup.get_fragment())
if faup.get_port():
self.add_attribute('port', value=faup.get_port())
if faup.get_query_string():
self.add_attribute('query_string', value=faup.get_query_string())
if faup.get_resource_path():
self.add_attribute('resource_path', value=faup.get_resource_path())
if faup.get_scheme():
self.add_attribute('scheme', value=faup.get_scheme())
if faup.get_tld():
self.add_attribute('tld', value=faup.get_tld())
if faup.get_domain_without_tld():
self.add_attribute('domain_without_tld', value=faup.get_domain_without_tld())
if faup.get_subdomain():
self.add_attribute('subdomain', value=faup.get_subdomain())
if hasattr(faup, 'get_unicode_host') and faup.get_unicode_host() != faup.get_host():
self.add_attribute('text', value=faup.get_unicode_host())
name = "pymisp"
version = "2.4.159"
version = ""
description = "Python API for MISP."
authors = ["Raphaël Vinot <raphael.vinot@circl.lu>"]
license = "BSD-2-Clause"
python = "^3.7"
requests = "^2.27.1"
requests = "^2.28.1"
python-dateutil = "^2.8.2"
jsonschema = "^4.6.0"
jsonschema = "^4.16.0"
deprecated = "^1.2.13"
extract_msg = {version = "^0.33.0", optional = true}
extract_msg = {version = "^0.36.4", optional = true}
RTFDE = {version = "^0.0.2", optional = true}
oletools = {version = "^0.60.1", optional = true}
python-magic = {version = "^0.4.27", optional = true}
pydeep2 = {version = "^0.5.1", optional = true}
lief = {version = "^0.12.1", optional = true}
lief = {version = "^0.12.2", optional = true}
beautifulsoup4 = {version = "^4.11.1", optional = true}
validators = {version = "^0.20.0", optional = true}
sphinx-autodoc-typehints = {version = "^1.18.2", optional = true}
sphinx-autodoc-typehints = {version = "^1.19.4", optional = true}
recommonmark = {version = "^0.7.1", optional = true}
reportlab = {version = "^3.6.10", optional = true}
reportlab = {version = "^3.6.11", optional = true}
pyfaup = {version = "^1.2", optional = true}
chardet = {version = "^4.0.0", optional = true}
urllib3 = {extras = ["brotli"], version = "^1.26.9", optional = true}
publicsuffixlist = {version = "^0.9.0", optional = true}
chardet = {version = "^5.0.0", optional = true}
urllib3 = {extras = ["brotli"], version = "^1.26.12", optional = true}
fileobjects = ['python-magic', 'pydeep2', 'lief']
@ -71,17 +72,17 @@ url = ['pyfaup', 'chardet']
email = ['extract_msg', "RTFDE", "oletools"]
brotli = ['urllib3']
requests-mock = "^1.9.3"
mypy = "^0.961"
requests-mock = "^1.10.0"
mypy = "^0.982"
ipython = "^7.34.0"
jupyterlab = "^3.4.3"
types-requests = "^2.27.30"
types-python-dateutil = "^2.8.17"
types-redis = "^4.2.6"
jupyterlab = "^3.4.8"
types-requests = "^"
types-python-dateutil = "^2.8.19"
types-redis = "^"
types-Flask = "^1.1.6"
pytest-cov = "^3.0.0"
pytest-cov = "^4.0.0"
requires = ["poetry_core>=1.1", "setuptools"]
build-backend = "poetry.core.masonry.api"
except ImportError as e:
url = 'https://localhost:8443'
key = 'i8ckGjsyrfRSCPqE0qqr0XJbsLlfbOyYDzdSDawM'
key = 'sL9hrjIyY405RyGQHLx5DoCAM92BNmmGa8P4ck1E'
verifycert = False
@ -300,6 +300,35 @@ class TestComprehensive(unittest.TestCase):
def test_search_index(self):
first, second, third = self.environment()
# Search as admin
events = self.admin_misp_connector.search_index(timestamp=first.timestamp.timestamp(), pythonify=True)
self.assertEqual(len(events), 3)
for e in events:
self.assertIn(e.id, [first.id, second.id, third.id])
# Test limit and pagination
event_one = self.admin_misp_connector.search_index(timestamp=first.timestamp.timestamp(), limit=1, page=1, pythonify=True)[0]
event_two = self.admin_misp_connector.search_index(timestamp=first.timestamp.timestamp(), limit=1, page=2, pythonify=True)[0]
self.assertTrue(event_one.id != event_two.id)
two_events = self.admin_misp_connector.search_index(limit=2)
self.assertTrue(len(two_events), 2)
# Test ordering by the Info field. Can't use timestamp as each will likely have the same
event = self.admin_misp_connector.search_index(timestamp=first.timestamp.timestamp(), sort="info", desc=True, limit=1, pythonify=True)[0]
# First|Second|*Third* event
self.assertEqual(event.id, third.id)
# *First*|Second|Third event
event = self.admin_misp_connector.search_index(timestamp=first.timestamp.timestamp(), sort="info", desc=False, limit=1, pythonify=True)[0]
self.assertEqual(event.id, first.id)
# Delete event
def test_search_objects(self):
'''Search for objects'''
@ -889,7 +918,7 @@ class TestComprehensive(unittest.TestCase):
# Test PyMISP.add_attribute with enforceWarninglist enabled
_e = events[0]
_a = _e.add_attribute('ip-src', '', enforceWarninglist=True)
_a = _e.add_attribute('ip-src', '', enforceWarninglist=True)
_a = self.user_misp_connector.add_attribute(_e, _a)
self.assertTrue('trips over a warninglist and enforceWarninglist is enforced' in _a['errors'][1]['errors'], _a)
@ -1096,6 +1125,7 @@ class TestComprehensive(unittest.TestCase):
first.attributes[0].to_ids = True
first = self.user_misp_connector.update_event(first)
self.admin_misp_connector.publish(first, alert=False)
csv = self.user_misp_connector.search(return_format='csv', publish_timestamp=first.timestamp.timestamp())
self.assertEqual(len(csv), 1)
self.assertEqual(csv[0]['value'], first.attributes[0].value)
@ -1164,6 +1194,7 @@ class TestComprehensive(unittest.TestCase):
first = self.user_misp_connector.add_event(first)
text = self.user_misp_connector.search(return_format='text', eventid=first.id)
self.assertEqual('', text.strip())
@ -1626,6 +1657,16 @@ class TestComprehensive(unittest.TestCase):
r = self.admin_misp_connector.disable_taxonomy(tax)
self.assertEqual(r['message'], 'Taxonomy disabled')
# Test toggling the required status
r = self.admin_misp_connector.set_taxonomy_required(tax, not tax.required)
self.assertEqual(r['message'], 'Taxonomy toggleRequireded')
updatedTax = self.admin_misp_connector.get_taxonomy(tax, pythonify=True)
self.assertFalse(tax.required == updatedTax.required)
# Return back to default required status
r = self.admin_misp_connector.set_taxonomy_required(tax, not tax.required)
def test_warninglists(self):
# Make sure we're up-to-date
r = self.admin_misp_connector.update_warninglists()
@ -2234,6 +2275,7 @@ class TestComprehensive(unittest.TestCase):
self.assertEqual(sharing_group.sgorgs[0].org_id, self.test_org.id)
def test_sharing_group_search(self):
# Add sharing group
@ -2277,8 +2319,10 @@ class TestComprehensive(unittest.TestCase):
# We should not be missing any of the attributes
def test_feeds(self):
# Add
