mirror of https://github.com/MISP/PyMISPGalaxies
commit
4a35a5a16d
|
@ -33,7 +33,7 @@ jobs:
|
||||||
|
|
||||||
- name: Test with nosetests
|
- name: Test with nosetests
|
||||||
run: |
|
run: |
|
||||||
poetry run pytest --cov=pymispgalaxies tests/tests.py
|
poetry run pytest --cov=pymispgalaxies tests/test*.py
|
||||||
poetry run mypy .
|
poetry run mypy .
|
||||||
|
|
||||||
- name: Upload coverage to Codecov
|
- name: Upload coverage to Codecov
|
||||||
|
|
|
@ -1,26 +1,10 @@
|
||||||
{
|
{
|
||||||
"nbformat": 4,
|
|
||||||
"nbformat_minor": 0,
|
|
||||||
"metadata": {
|
|
||||||
"colab": {
|
|
||||||
"provenance": [],
|
|
||||||
"authorship_tag": "ABX9TyOFSmnINQ4YRBroomWdb+/2",
|
|
||||||
"include_colab_link": true
|
|
||||||
},
|
|
||||||
"kernelspec": {
|
|
||||||
"name": "python3",
|
|
||||||
"display_name": "Python 3"
|
|
||||||
},
|
|
||||||
"language_info": {
|
|
||||||
"name": "python"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"cells": [
|
"cells": [
|
||||||
{
|
{
|
||||||
"cell_type": "markdown",
|
"cell_type": "markdown",
|
||||||
"metadata": {
|
"metadata": {
|
||||||
"id": "view-in-github",
|
"colab_type": "text",
|
||||||
"colab_type": "text"
|
"id": "view-in-github"
|
||||||
},
|
},
|
||||||
"source": [
|
"source": [
|
||||||
"<a href=\"https://colab.research.google.com/github/sebdraven/PyMISPGalaxies/blob/main/Galaxie_MISP.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>"
|
"<a href=\"https://colab.research.google.com/github/sebdraven/PyMISPGalaxies/blob/main/Galaxie_MISP.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>"
|
||||||
|
@ -38,8 +22,8 @@
|
||||||
},
|
},
|
||||||
"outputs": [
|
"outputs": [
|
||||||
{
|
{
|
||||||
"output_type": "stream",
|
|
||||||
"name": "stdout",
|
"name": "stdout",
|
||||||
|
"output_type": "stream",
|
||||||
"text": [
|
"text": [
|
||||||
"Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/\n",
|
"Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/\n",
|
||||||
"Collecting PyMISPGalaxies\n",
|
"Collecting PyMISPGalaxies\n",
|
||||||
|
@ -65,38 +49,52 @@
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"cell_type": "code",
|
"cell_type": "code",
|
||||||
"source": [
|
"execution_count": 1,
|
||||||
"from pymispgalaxies import Clusters"
|
|
||||||
],
|
|
||||||
"metadata": {
|
"metadata": {
|
||||||
"id": "Gy_cjV42Faj-"
|
"id": "Gy_cjV42Faj-"
|
||||||
},
|
},
|
||||||
"execution_count": 2,
|
"outputs": [],
|
||||||
"outputs": []
|
"source": [
|
||||||
|
"from pymispgalaxies import Clusters"
|
||||||
|
]
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"cell_type": "markdown",
|
"cell_type": "markdown",
|
||||||
"source": [
|
|
||||||
"To Choose a cluster, the name of cluster is the name of file in <https://github.com/MISP/misp-galaxy/clusters/>, here we use malpedia"
|
|
||||||
],
|
|
||||||
"metadata": {
|
"metadata": {
|
||||||
"id": "Z_FZERTPMV0s"
|
"id": "Z_FZERTPMV0s"
|
||||||
}
|
},
|
||||||
|
"source": [
|
||||||
|
"To Choose a cluster, the name of cluster is the name of file in <https://github.com/MISP/misp-galaxy/clusters/>, here we use malpedia"
|
||||||
|
]
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"cell_type": "code",
|
"cell_type": "code",
|
||||||
"source": [
|
"execution_count": 2,
|
||||||
"cluster_malpedia = Clusters().get('malpedia') #corresponding to https://github.com/MISP/misp-galaxy/clusters/malpedia.json\n",
|
|
||||||
"cluster_malpedia"
|
|
||||||
],
|
|
||||||
"metadata": {
|
"metadata": {
|
||||||
"id": "pmsFAlTsFr_Q"
|
"id": "pmsFAlTsFr_Q"
|
||||||
},
|
},
|
||||||
"execution_count": null,
|
"outputs": [
|
||||||
"outputs": []
|
{
|
||||||
|
"data": {
|
||||||
|
"text/plain": [
|
||||||
|
"<pymispgalaxies.api.Cluster at 0x75b3464402c0>"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"execution_count": 2,
|
||||||
|
"metadata": {},
|
||||||
|
"output_type": "execute_result"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"source": [
|
||||||
|
"cluster_malpedia = Clusters().get('malpedia') #corresponding to https://github.com/MISP/misp-galaxy/clusters/malpedia.json\n",
|
||||||
|
"cluster_malpedia"
|
||||||
|
]
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"cell_type": "markdown",
|
"cell_type": "markdown",
|
||||||
|
"metadata": {
|
||||||
|
"id": "xf3vTuWsNzF6"
|
||||||
|
},
|
||||||
"source": [
|
"source": [
|
||||||
"To access in a entry json like Zeus:\n",
|
"To access in a entry json like Zeus:\n",
|
||||||
"\n",
|
"\n",
|
||||||
|
@ -154,17 +152,11 @@
|
||||||
" \"value\": \"Zeus\"\n",
|
" \"value\": \"Zeus\"\n",
|
||||||
" }\n",
|
" }\n",
|
||||||
" ```"
|
" ```"
|
||||||
],
|
]
|
||||||
"metadata": {
|
|
||||||
"id": "xf3vTuWsNzF6"
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"cell_type": "code",
|
"cell_type": "code",
|
||||||
"source": [
|
"execution_count": 9,
|
||||||
"zeus = cluster_malpedia.cluster_values['Zeus']\n",
|
|
||||||
"zeus.to_dict()"
|
|
||||||
],
|
|
||||||
"metadata": {
|
"metadata": {
|
||||||
"colab": {
|
"colab": {
|
||||||
"base_uri": "https://localhost:8080/"
|
"base_uri": "https://localhost:8080/"
|
||||||
|
@ -172,36 +164,38 @@
|
||||||
"id": "2dVS64R9Nxwu",
|
"id": "2dVS64R9Nxwu",
|
||||||
"outputId": "7ebb7915-c981-4814-e7cb-b4ba96aa409f"
|
"outputId": "7ebb7915-c981-4814-e7cb-b4ba96aa409f"
|
||||||
},
|
},
|
||||||
"execution_count": 8,
|
|
||||||
"outputs": [
|
"outputs": [
|
||||||
{
|
{
|
||||||
"output_type": "execute_result",
|
|
||||||
"data": {
|
"data": {
|
||||||
"text/plain": [
|
"text/plain": [
|
||||||
"{'value': 'Zeus',\n",
|
"{'value': 'Zeus',\n",
|
||||||
" 'uuid': '4e8c1ab7-2841-4823-a5d1-39284fb0969a',\n",
|
" 'uuid': '4e8c1ab7-2841-4823-a5d1-39284fb0969a',\n",
|
||||||
" 'meta': <pymispgalaxies.api.ClusterValueMeta at 0x7f870e02ba60>}"
|
" 'description': \"According to CrowdStrike, The two primary goals of the Zeus trojan horse virus are stealing people's financial information and adding machines to a botnet. Unlike many types of malware, most Zeus variants try to avoid doing long-term damage to the devices they infect. Their aim is to avoid detection from antivirus software.\",\n",
|
||||||
|
" 'meta': <pymispgalaxies.api.ClusterValueMeta at 0x75b33bee97f0>}"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
|
"execution_count": 9,
|
||||||
"metadata": {},
|
"metadata": {},
|
||||||
"execution_count": 8
|
"output_type": "execute_result"
|
||||||
}
|
}
|
||||||
|
],
|
||||||
|
"source": [
|
||||||
|
"zeus = cluster_malpedia.get('Zeus')\n",
|
||||||
|
"zeus.to_dict()"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"cell_type": "markdown",
|
"cell_type": "markdown",
|
||||||
"source": [
|
|
||||||
"To access at metadata"
|
|
||||||
],
|
|
||||||
"metadata": {
|
"metadata": {
|
||||||
"id": "SHmE7qcDPBcF"
|
"id": "SHmE7qcDPBcF"
|
||||||
}
|
},
|
||||||
|
"source": [
|
||||||
|
"To access at metadata"
|
||||||
|
]
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"cell_type": "code",
|
"cell_type": "code",
|
||||||
"source": [
|
"execution_count": 11,
|
||||||
"zeus.meta.to_dict()"
|
|
||||||
],
|
|
||||||
"metadata": {
|
"metadata": {
|
||||||
"colab": {
|
"colab": {
|
||||||
"base_uri": "https://localhost:8080/"
|
"base_uri": "https://localhost:8080/"
|
||||||
|
@ -209,77 +203,76 @@
|
||||||
"id": "-T6MYOzJOrVF",
|
"id": "-T6MYOzJOrVF",
|
||||||
"outputId": "bc22b364-a1a0-470e-d4ab-8e833e81753a"
|
"outputId": "bc22b364-a1a0-470e-d4ab-8e833e81753a"
|
||||||
},
|
},
|
||||||
"execution_count": 9,
|
|
||||||
"outputs": [
|
"outputs": [
|
||||||
{
|
{
|
||||||
"output_type": "execute_result",
|
|
||||||
"data": {
|
"data": {
|
||||||
"text/plain": [
|
"text/plain": [
|
||||||
"{'refs': ['https://malpedia.caad.fkie.fraunhofer.de/details/win.zeus',\n",
|
"{'refs': ['https://malpedia.caad.fkie.fraunhofer.de/details/win.zeus',\n",
|
||||||
" 'https://securelist.com/financial-cyberthreats-in-2020/101638/',\n",
|
" 'https://www.anomali.com/files/white-papers/russian-federation-country-profile.pdf',\n",
|
||||||
" 'https://www.kryptoslogic.com/blog/2021/07/trickbot-and-zeus/',\n",
|
|
||||||
" 'http://eternal-todo.com/blog/detecting-zeus',\n",
|
|
||||||
" 'https://www.symantec.com/connect/blogs/spyeye-s-kill-zeus-bark-worse-its-bite',\n",
|
|
||||||
" 'http://malwareint.blogspot.com/2010/02/zeus-on-irs-scam-remains-actively.html',\n",
|
|
||||||
" 'https://www.youtube.com/watch?v=LUxOcpIRxmg',\n",
|
|
||||||
" 'https://www.secureworks.com/research/threat-profiles/bronze-woodland',\n",
|
|
||||||
" 'http://www.symantec.com/content/en/us/enterprise/media/security_response/whitepapers/zeus_king_of_bots.pdf',\n",
|
|
||||||
" 'https://www.mnin.org/write/ZeusMalware.pdf',\n",
|
|
||||||
" 'https://www.secureworks.com/research/zeus?threat=zeus',\n",
|
|
||||||
" 'https://blog.malwarebytes.com/101/2021/07/the-life-and-death-of-the-zeus-trojan/',\n",
|
|
||||||
" 'https://krebsonsecurity.com/2019/12/inside-evil-corp-a-100m-cybercrime-menace/',\n",
|
|
||||||
" 'https://us-cert.cisa.gov/ncas/alerts/aa20-345a',\n",
|
|
||||||
" 'http://eternal-todo.com/blog/new-zeus-binary',\n",
|
|
||||||
" 'https://blog.talosintelligence.com/2022/02/threat-roundup-0204-0211.html',\n",
|
|
||||||
" 'http://malwareint.blogspot.com/2010/02/facebook-phishing-campaign-proposed-by.html',\n",
|
|
||||||
" 'https://blog.trendmicro.com/trendlabs-security-intelligence/kivars-with-venom-targeted-attacks-upgrade-with-64-bit-support/',\n",
|
|
||||||
" 'https://go.recordedfuture.com/hubfs/reports/cta-2021-0909.pdf',\n",
|
|
||||||
" 'http://contagiodump.blogspot.com/2010/07/zeus-trojan-research-links.html',\n",
|
|
||||||
" 'https://www.wired.com/2017/03/russian-hacker-spy-botnet/',\n",
|
|
||||||
" 'http://malwareint.blogspot.com/2009/07/special-zeus-botnet-for-dummies.html',\n",
|
|
||||||
" 'http://malwareint.blogspot.com/2010/03/new-phishing-campaign-against-facebook.html',\n",
|
|
||||||
" 'https://www.secureworks.com/research/threat-profiles/gold-evergreen',\n",
|
|
||||||
" 'http://contagiodump.blogspot.com/2010/07/zeus-version-scheme-by-trojan-author.html',\n",
|
|
||||||
" 'https://www.f5.com/labs/articles/education/banking-trojans-a-reference-guide-to-the-malware-family-tree',\n",
|
" 'https://www.f5.com/labs/articles/education/banking-trojans-a-reference-guide-to-the-malware-family-tree',\n",
|
||||||
" 'https://nakedsecurity.sophos.com/2010/07/24/sample-run/',\n",
|
" 'http://contagiodump.blogspot.com/2010/07/zeus-trojan-research-links.html',\n",
|
||||||
" 'https://www.justice.gov/opa/pr/four-individuals-plead-guilty-rico-conspiracy-involving-bulletproof-hosting-cybercriminals',\n",
|
|
||||||
" 'http://contagiodump.blogspot.com/2012/12/dec-2012-linuxchapro-trojan-apache.html',\n",
|
|
||||||
" 'https://www.cert.ssi.gouv.fr/uploads/CERTFR-2020-CTI-008.pdf',\n",
|
" 'https://www.cert.ssi.gouv.fr/uploads/CERTFR-2020-CTI-008.pdf',\n",
|
||||||
" 'https://www.symantec.com/connect/blogs/brief-look-zeuszbot-20',\n",
|
|
||||||
" 'https://www.trendmicro.com/content/dam/trendmicro/global/en/research/21/i/ssl-tls-technical-brief/ssl-tls-technical-brief.pdf',\n",
|
|
||||||
" 'https://www.secureworks.com/research/evolution-of-the-gold-evergreen-threat-group',\n",
|
|
||||||
" 'https://www.s21sec.com/en/zeus-the-missing-link/',\n",
|
|
||||||
" 'http://malwareint.blogspot.com/2010/01/leveraging-zeus-to-send-spam-through.html',\n",
|
|
||||||
" 'http://eternal-todo.com/blog/zeus-spreading-facebook',\n",
|
|
||||||
" 'https://web.archive.org/web/20160616170611/https://media.blackhat.com/bh-eu-10/presentations/Carrera_Silberman/BlackHat-EU-2010-Carrera-Silberman-State-of-Malware-slides.pdf',\n",
|
" 'https://web.archive.org/web/20160616170611/https://media.blackhat.com/bh-eu-10/presentations/Carrera_Silberman/BlackHat-EU-2010-Carrera-Silberman-State-of-Malware-slides.pdf',\n",
|
||||||
|
" 'http://eternal-todo.com/blog/detecting-zeus',\n",
|
||||||
|
" 'http://malwareint.blogspot.com/2010/03/new-phishing-campaign-against-facebook.html',\n",
|
||||||
|
" 'https://nakedsecurity.sophos.com/2010/07/24/sample-run/',\n",
|
||||||
|
" 'https://www.trendmicro.com/content/dam/trendmicro/global/en/research/21/i/ssl-tls-technical-brief/ssl-tls-technical-brief.pdf',\n",
|
||||||
|
" 'https://www.symantec.com/connect/blogs/brief-look-zeuszbot-20',\n",
|
||||||
|
" 'https://www.mnin.org/write/ZeusMalware.pdf',\n",
|
||||||
|
" 'https://securelist.com/financial-cyberthreats-in-2020/101638/',\n",
|
||||||
|
" 'https://www.secureworks.com/research/zeus?threat=zeus',\n",
|
||||||
|
" 'https://krebsonsecurity.com/2019/12/inside-evil-corp-a-100m-cybercrime-menace/',\n",
|
||||||
|
" 'http://eternal-todo.com/blog/zeus-spreading-facebook',\n",
|
||||||
|
" 'http://malwareint.blogspot.com/2010/02/facebook-phishing-campaign-proposed-by.html',\n",
|
||||||
|
" 'http://www.symantec.com/content/en/us/enterprise/media/security_response/whitepapers/zeus_king_of_bots.pdf',\n",
|
||||||
|
" 'https://www.kryptoslogic.com/blog/2021/07/trickbot-and-zeus/',\n",
|
||||||
|
" 'https://www.crowdstrike.com/cybersecurity-101/malware/trojan-zeus-malware',\n",
|
||||||
|
" 'https://www.wired.com/2017/03/russian-hacker-spy-botnet/',\n",
|
||||||
|
" 'https://us-cert.cisa.gov/ncas/alerts/aa20-345a',\n",
|
||||||
|
" 'https://www.s21sec.com/en/zeus-the-missing-link/',\n",
|
||||||
|
" 'https://www.symantec.com/connect/blogs/spyeye-s-kill-zeus-bark-worse-its-bite',\n",
|
||||||
|
" 'https://go.recordedfuture.com/hubfs/reports/cta-2021-0909.pdf',\n",
|
||||||
|
" 'https://www.justice.gov/opa/pr/four-individuals-plead-guilty-rico-conspiracy-involving-bulletproof-hosting-cybercriminals',\n",
|
||||||
|
" 'https://www.secureworks.com/research/evolution-of-the-gold-evergreen-threat-group',\n",
|
||||||
|
" 'https://www.youtube.com/watch?v=LUxOcpIRxmg',\n",
|
||||||
|
" 'https://www.secureworks.com/research/threat-profiles/gold-evergreen',\n",
|
||||||
|
" 'https://www.cisecurity.org/insights/blog/top-10-malware-march-2022',\n",
|
||||||
|
" 'https://www.secureworks.com/research/threat-profiles/bronze-woodland',\n",
|
||||||
|
" 'https://blog.malwarebytes.com/101/2021/07/the-life-and-death-of-the-zeus-trojan/',\n",
|
||||||
|
" 'https://blog.talosintelligence.com/2022/02/threat-roundup-0204-0211.html',\n",
|
||||||
|
" 'https://unit42.paloaltonetworks.com/banking-trojan-techniques/',\n",
|
||||||
|
" 'https://blog.trendmicro.com/trendlabs-security-intelligence/kivars-with-venom-targeted-attacks-upgrade-with-64-bit-support/',\n",
|
||||||
|
" 'http://malwareint.blogspot.com/2010/02/zeus-on-irs-scam-remains-actively.html',\n",
|
||||||
|
" 'http://malwareint.blogspot.com/2010/01/leveraging-zeus-to-send-spam-through.html',\n",
|
||||||
|
" 'http://eternal-todo.com/blog/new-zeus-binary',\n",
|
||||||
|
" 'http://contagiodump.blogspot.com/2012/12/dec-2012-linuxchapro-trojan-apache.html',\n",
|
||||||
|
" 'http://malwareint.blogspot.com/2009/07/special-zeus-botnet-for-dummies.html',\n",
|
||||||
" 'http://www.secureworks.com/research/threat-profiles/gold-evergreen',\n",
|
" 'http://www.secureworks.com/research/threat-profiles/gold-evergreen',\n",
|
||||||
" 'https://www.anomali.com/files/white-papers/russian-federation-country-profile.pdf'],\n",
|
" 'http://contagiodump.blogspot.com/2010/07/zeus-version-scheme-by-trojan-author.html'],\n",
|
||||||
" 'synonyms': ['Zbot']}"
|
" 'synonyms': ['Zbot']}"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
|
"execution_count": 11,
|
||||||
"metadata": {},
|
"metadata": {},
|
||||||
"execution_count": 9
|
"output_type": "execute_result"
|
||||||
}
|
}
|
||||||
|
],
|
||||||
|
"source": [
|
||||||
|
"zeus.meta.to_dict()"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"cell_type": "markdown",
|
"cell_type": "markdown",
|
||||||
"source": [
|
|
||||||
"To list all entries, with metadata"
|
|
||||||
],
|
|
||||||
"metadata": {
|
"metadata": {
|
||||||
"id": "Tq96ubMoPWoV"
|
"id": "Tq96ubMoPWoV"
|
||||||
}
|
},
|
||||||
|
"source": [
|
||||||
|
"To list all entries, with metadata"
|
||||||
|
]
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"cell_type": "code",
|
"cell_type": "code",
|
||||||
"source": [
|
"execution_count": 10,
|
||||||
"for name,cluster_value in cluster_malpedia.cluster_values.items():\n",
|
|
||||||
" obj_dict = cluster_value.to_dict()\n",
|
|
||||||
" if 'meta' in obj_dict:\n",
|
|
||||||
" meta = obj_dict['meta'].to_dict()\n",
|
|
||||||
" print(name, meta)"
|
|
||||||
],
|
|
||||||
"metadata": {
|
"metadata": {
|
||||||
"colab": {
|
"colab": {
|
||||||
"base_uri": "https://localhost:8080/"
|
"base_uri": "https://localhost:8080/"
|
||||||
|
@ -287,11 +280,10 @@
|
||||||
"id": "rWcAjS6ZPVn_",
|
"id": "rWcAjS6ZPVn_",
|
||||||
"outputId": "ac25600c-fdd7-460c-835d-c6d6b4bfda60"
|
"outputId": "ac25600c-fdd7-460c-835d-c6d6b4bfda60"
|
||||||
},
|
},
|
||||||
"execution_count": 10,
|
|
||||||
"outputs": [
|
"outputs": [
|
||||||
{
|
{
|
||||||
"output_type": "stream",
|
|
||||||
"name": "stdout",
|
"name": "stdout",
|
||||||
|
"output_type": "stream",
|
||||||
"text": [
|
"text": [
|
||||||
"FastCash {'refs': ['https://malpedia.caad.fkie.fraunhofer.de/details/aix.fastcash', 'https://blog.lexfo.fr/ressources/Lexfo-WhitePaper-The_Lazarus_Constellation.pdf', 'https://github.com/fboldewin/FastCashMalwareDissected/', 'https://www.cisa.gov/uscert/ncas/alerts/TA18-275A', 'https://www.cisa.gov/uscert/ncas/alerts/aa20-239a', 'https://mal-eats.net/en/2021/05/11/campo_new_attack_campaign_targeting_japan/', 'https://www.youtube.com/watch?v=zGvQPtejX9w', 'https://www.us-cert.gov/ncas/alerts/TA18-275A', 'https://blog.talosintelligence.com/2019/05/10-years-of-virtual-dynamite.html', 'https://threatrecon.nshc.net/2019/01/23/sectora01-custom-proxy-utility-tool-analysis/', 'https://i.blackhat.com/USA-20/Wednesday/us-20-Perlow-FASTCash-And-INJX_Pure-How-Threat-Actors-Use-Public-Standards-For-Financial-Fraud.pdf', 'https://i.blackhat.com/USA-20/Wednesday/us-20-Perlow-FASTCash-And-INJX_Pure-How-Threat-Actors-Use-Public-Standards-For-Financial-Fraud-wp.pdf', 'https://www.cisa.gov/uscert/sites/default/files/publications/AA22-108A-TraderTraitor-North_Korea_APT_Targets_Blockchain_Companies.pdf', 'https://www.youtube.com/watch?v=LUxOcpIRxmg', 'https://i.blackhat.com/eu-20/Wednesday/eu-20-Rivera-From-Zero-To-Sixty-The-Story-Of-North-Koreas-Rapid-Ascent-To-Becoming-A-Global-Cyber-Superpower.pdf', 'https://symantec-blogs.broadcom.com/blogs/threat-intelligence/fastcash-lazarus-atm-malware', 'https://www.symantec.com/blogs/threat-intelligence/fastcash-lazarus-atm-malware']}\n",
|
"FastCash {'refs': ['https://malpedia.caad.fkie.fraunhofer.de/details/aix.fastcash', 'https://blog.lexfo.fr/ressources/Lexfo-WhitePaper-The_Lazarus_Constellation.pdf', 'https://github.com/fboldewin/FastCashMalwareDissected/', 'https://www.cisa.gov/uscert/ncas/alerts/TA18-275A', 'https://www.cisa.gov/uscert/ncas/alerts/aa20-239a', 'https://mal-eats.net/en/2021/05/11/campo_new_attack_campaign_targeting_japan/', 'https://www.youtube.com/watch?v=zGvQPtejX9w', 'https://www.us-cert.gov/ncas/alerts/TA18-275A', 'https://blog.talosintelligence.com/2019/05/10-years-of-virtual-dynamite.html', 'https://threatrecon.nshc.net/2019/01/23/sectora01-custom-proxy-utility-tool-analysis/', 'https://i.blackhat.com/USA-20/Wednesday/us-20-Perlow-FASTCash-And-INJX_Pure-How-Threat-Actors-Use-Public-Standards-For-Financial-Fraud.pdf', 'https://i.blackhat.com/USA-20/Wednesday/us-20-Perlow-FASTCash-And-INJX_Pure-How-Threat-Actors-Use-Public-Standards-For-Financial-Fraud-wp.pdf', 'https://www.cisa.gov/uscert/sites/default/files/publications/AA22-108A-TraderTraitor-North_Korea_APT_Targets_Blockchain_Companies.pdf', 'https://www.youtube.com/watch?v=LUxOcpIRxmg', 'https://i.blackhat.com/eu-20/Wednesday/eu-20-Rivera-From-Zero-To-Sixty-The-Story-Of-North-Koreas-Rapid-Ascent-To-Becoming-A-Global-Cyber-Superpower.pdf', 'https://symantec-blogs.broadcom.com/blogs/threat-intelligence/fastcash-lazarus-atm-malware', 'https://www.symantec.com/blogs/threat-intelligence/fastcash-lazarus-atm-malware']}\n",
|
||||||
"888 RAT {'refs': ['https://malpedia.caad.fkie.fraunhofer.de/details/apk.888_rat', 'https://www.welivesecurity.com/2021/09/07/bladehawk-android-espionage-kurdish/']}\n",
|
"888 RAT {'refs': ['https://malpedia.caad.fkie.fraunhofer.de/details/apk.888_rat', 'https://www.welivesecurity.com/2021/09/07/bladehawk-android-espionage-kurdish/']}\n",
|
||||||
|
@ -2869,7 +2861,39 @@
|
||||||
"Zyklon {'refs': ['https://malpedia.caad.fkie.fraunhofer.de/details/win.zyklon', 'https://www.fireeye.com/blog/threat-research/2018/01/microsoft-office-vulnerabilities-used-to-distribute-zyklon-malware.html', 'https://blog.talosintelligence.com/2017/05/modified-zyklon-and-plugins-from-india.html']}\n"
|
"Zyklon {'refs': ['https://malpedia.caad.fkie.fraunhofer.de/details/win.zyklon', 'https://www.fireeye.com/blog/threat-research/2018/01/microsoft-office-vulnerabilities-used-to-distribute-zyklon-malware.html', 'https://blog.talosintelligence.com/2017/05/modified-zyklon-and-plugins-from-india.html']}\n"
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
],
|
||||||
|
"source": [
|
||||||
|
"for name,cluster_value in cluster_malpedia.cluster_values.items():\n",
|
||||||
|
" obj_dict = cluster_value.to_dict()\n",
|
||||||
|
" if 'meta' in obj_dict:\n",
|
||||||
|
" meta = obj_dict['meta'].to_dict()\n",
|
||||||
|
" print(name, meta)"
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
]
|
],
|
||||||
|
"metadata": {
|
||||||
|
"colab": {
|
||||||
|
"authorship_tag": "ABX9TyOFSmnINQ4YRBroomWdb+/2",
|
||||||
|
"include_colab_link": true,
|
||||||
|
"provenance": []
|
||||||
|
},
|
||||||
|
"kernelspec": {
|
||||||
|
"display_name": "Python 3",
|
||||||
|
"name": "python3"
|
||||||
|
},
|
||||||
|
"language_info": {
|
||||||
|
"codemirror_mode": {
|
||||||
|
"name": "ipython",
|
||||||
|
"version": 3
|
||||||
|
},
|
||||||
|
"file_extension": ".py",
|
||||||
|
"mimetype": "text/x-python",
|
||||||
|
"name": "python",
|
||||||
|
"nbconvert_exporter": "python",
|
||||||
|
"pygments_lexer": "ipython3",
|
||||||
|
"version": "3.12.3"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"nbformat": 4,
|
||||||
|
"nbformat_minor": 0
|
||||||
}
|
}
|
|
@ -1,4 +1,4 @@
|
||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
from .api import Galaxies, Clusters, EncodeGalaxies, EncodeClusters, UnableToRevertMachinetag
|
from .api import Galaxies, Galaxy, Clusters, Cluster, ClusterValue, EncodeGalaxies, EncodeClusters, UnableToRevertMachinetag
|
||||||
|
|
|
@ -47,8 +47,35 @@ class UnableToRevertMachinetag(PyMISPGalaxiesError):
|
||||||
|
|
||||||
|
|
||||||
class Galaxy():
|
class Galaxy():
|
||||||
|
"""
|
||||||
|
Represents a galaxy in the PyMISPGalaxies library.
|
||||||
|
|
||||||
def __init__(self, galaxy: Dict[str, str]):
|
Attributes:
|
||||||
|
galaxy (Dict[str, str]): The dictionary containing the galaxy data.
|
||||||
|
type (str): The type of the galaxy.
|
||||||
|
name (str): The name of the galaxy.
|
||||||
|
icon (str): The icon of the galaxy.
|
||||||
|
description (str): The description of the galaxy.
|
||||||
|
version (str): The version of the galaxy.
|
||||||
|
uuid (str): The UUID of the galaxy.
|
||||||
|
namespace (str, optional): The namespace of the galaxy.
|
||||||
|
kill_chain_order (str, optional): The kill chain order of the galaxy.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, galaxy: Union[str, Dict[str, str]]):
|
||||||
|
"""
|
||||||
|
Initializes a Galaxy object from an existing galaxy.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
galaxy (str): The name of the existing galaxy to load from the data folder.
|
||||||
|
galaxy (Dict[str, str]): The dictionary containing the galaxy data.
|
||||||
|
"""
|
||||||
|
if isinstance(galaxy, str):
|
||||||
|
root_dir_galaxies = os.path.join(os.path.abspath(os.path.dirname(sys.modules['pymispgalaxies'].__file__)), 'data', 'misp-galaxy', 'galaxies') # type: ignore [type-var, arg-type]
|
||||||
|
galaxy_file = os.path.join(root_dir_galaxies, f"{galaxy}.json")
|
||||||
|
with open(galaxy_file, 'r') as f:
|
||||||
|
self.galaxy = json.load(f)
|
||||||
|
else:
|
||||||
self.galaxy = galaxy
|
self.galaxy = galaxy
|
||||||
self.type = self.galaxy['type']
|
self.type = self.galaxy['type']
|
||||||
self.name = self.galaxy['name']
|
self.name = self.galaxy['name']
|
||||||
|
@ -59,10 +86,35 @@ class Galaxy():
|
||||||
self.namespace = self.galaxy.pop('namespace', None)
|
self.namespace = self.galaxy.pop('namespace', None)
|
||||||
self.kill_chain_order = self.galaxy.pop('kill_chain_order', None)
|
self.kill_chain_order = self.galaxy.pop('kill_chain_order', None)
|
||||||
|
|
||||||
|
def save(self, name: str) -> None:
|
||||||
|
"""
|
||||||
|
Saves the galaxy to a file <name>.json
|
||||||
|
|
||||||
|
Args:
|
||||||
|
name (str): The name of the file to save the galaxy to.
|
||||||
|
"""
|
||||||
|
root_dir_galaxies = os.path.join(os.path.abspath(os.path.dirname(sys.modules['pymispgalaxies'].__file__)), 'data', 'misp-galaxy', 'galaxies') # type: ignore [type-var, arg-type]
|
||||||
|
galaxy_file = os.path.join(root_dir_galaxies, f"{name}.json")
|
||||||
|
with open(galaxy_file, 'w') as f:
|
||||||
|
json.dump(self, f, cls=EncodeGalaxies, indent=2, sort_keys=True, ensure_ascii=False)
|
||||||
|
f.write('\n') # needed for the beauty and to be compliant with jq_all_the_things
|
||||||
|
|
||||||
def to_json(self) -> str:
|
def to_json(self) -> str:
|
||||||
|
"""
|
||||||
|
Converts the galaxy object to a JSON string.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: The JSON representation of the galaxy object.
|
||||||
|
"""
|
||||||
return json.dumps(self, cls=EncodeGalaxies)
|
return json.dumps(self, cls=EncodeGalaxies)
|
||||||
|
|
||||||
def to_dict(self) -> Dict[str, str]:
|
def to_dict(self) -> Dict[str, str]:
|
||||||
|
"""
|
||||||
|
Converts the galaxy object to a dictionary.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dict[str, str]: The dictionary representation of the galaxy object.
|
||||||
|
"""
|
||||||
to_return = {'type': self.type, 'name': self.name, 'description': self.description,
|
to_return = {'type': self.type, 'name': self.name, 'description': self.description,
|
||||||
'version': self.version, 'uuid': self.uuid, 'icon': self.icon}
|
'version': self.version, 'uuid': self.uuid, 'icon': self.icon}
|
||||||
if self.namespace:
|
if self.namespace:
|
||||||
|
@ -73,8 +125,32 @@ class Galaxy():
|
||||||
|
|
||||||
|
|
||||||
class Galaxies(Mapping): # type: ignore
|
class Galaxies(Mapping): # type: ignore
|
||||||
|
"""
|
||||||
|
A class representing a collection of MISP galaxies.
|
||||||
|
|
||||||
def __init__(self, galaxies: List[Dict[str, str]]=[]):
|
Parameters:
|
||||||
|
- galaxies: A list of dictionaries representing the galaxies. Each dictionary should contain the name and other properties of a galaxy.
|
||||||
|
If left empty, the galaxies are loaded from the data folder.
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
- galaxies: A dictionary containing the galaxies, where the keys are the names of the galaxies and the values are instances of the Galaxy class.
|
||||||
|
- root_dir_galaxies: The root directory of the MISP galaxies.
|
||||||
|
|
||||||
|
Methods:
|
||||||
|
- validate_with_schema: Validates the galaxies against the schema.
|
||||||
|
- __getitem__: Returns the galaxy with the specified name.
|
||||||
|
- __iter__: Returns an iterator over the galaxy names.
|
||||||
|
- __len__: Returns the number of galaxies in the collection.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, galaxies: List[Dict[str, str]] = []):
|
||||||
|
"""
|
||||||
|
Initializes a new instance of the Galaxies class.
|
||||||
|
|
||||||
|
Parameters:
|
||||||
|
- galaxies: A list of dictionaries representing the galaxies. Each dictionary should contain the name and other properties of a galaxy.
|
||||||
|
If left empty, the galaxies are loaded from the data folder.
|
||||||
|
"""
|
||||||
if not galaxies:
|
if not galaxies:
|
||||||
galaxies = []
|
galaxies = []
|
||||||
self.root_dir_galaxies = os.path.join(os.path.abspath(os.path.dirname(sys.modules['pymispgalaxies'].__file__)), # type: ignore
|
self.root_dir_galaxies = os.path.join(os.path.abspath(os.path.dirname(sys.modules['pymispgalaxies'].__file__)), # type: ignore
|
||||||
|
@ -88,6 +164,12 @@ class Galaxies(Mapping): # type: ignore
|
||||||
self.galaxies[galaxy['name']] = Galaxy(galaxy)
|
self.galaxies[galaxy['name']] = Galaxy(galaxy)
|
||||||
|
|
||||||
def validate_with_schema(self) -> None:
|
def validate_with_schema(self) -> None:
|
||||||
|
"""
|
||||||
|
Validates the galaxies against the schema.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
- ImportError: If the jsonschema module is not installed.
|
||||||
|
"""
|
||||||
if not HAS_JSONSCHEMA:
|
if not HAS_JSONSCHEMA:
|
||||||
raise ImportError('jsonschema is required: pip install jsonschema')
|
raise ImportError('jsonschema is required: pip install jsonschema')
|
||||||
schema = os.path.join(os.path.abspath(os.path.dirname(sys.modules['pymispgalaxies'].__file__)), # type: ignore
|
schema = os.path.join(os.path.abspath(os.path.dirname(sys.modules['pymispgalaxies'].__file__)), # type: ignore
|
||||||
|
@ -98,12 +180,36 @@ class Galaxies(Mapping): # type: ignore
|
||||||
jsonschema.validate(g.galaxy, loaded_schema)
|
jsonschema.validate(g.galaxy, loaded_schema)
|
||||||
|
|
||||||
def __getitem__(self, name: str) -> Galaxy:
|
def __getitem__(self, name: str) -> Galaxy:
|
||||||
|
"""
|
||||||
|
Returns the galaxy with the specified name.
|
||||||
|
|
||||||
|
Parameters:
|
||||||
|
- name: The name of the galaxy.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
- The Galaxy instance with the specified name.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
- KeyError: If the galaxy with the specified name does not exist.
|
||||||
|
"""
|
||||||
return self.galaxies[name]
|
return self.galaxies[name]
|
||||||
|
|
||||||
def __iter__(self) -> Iterator[str]:
|
def __iter__(self) -> Iterator[str]:
|
||||||
|
"""
|
||||||
|
Returns an iterator over the galaxy names.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
- An iterator over the galaxy names.
|
||||||
|
"""
|
||||||
return iter(self.galaxies)
|
return iter(self.galaxies)
|
||||||
|
|
||||||
def __len__(self) -> int:
|
def __len__(self) -> int:
|
||||||
|
"""
|
||||||
|
Returns the number of galaxies in the collection.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
- The number of galaxies in the collection.
|
||||||
|
"""
|
||||||
return len(self.galaxies)
|
return len(self.galaxies)
|
||||||
|
|
||||||
|
|
||||||
|
@ -173,14 +279,45 @@ class ClusterValueMeta():
|
||||||
|
|
||||||
|
|
||||||
class ClusterValue():
|
class ClusterValue():
|
||||||
|
"""
|
||||||
|
Represents a cluster value.
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
uuid (str): The UUID of the cluster value.
|
||||||
|
value (Any): The value of the cluster.
|
||||||
|
description (str): The description of the cluster value.
|
||||||
|
meta (ClusterValueMeta): The metadata associated with the cluster value.
|
||||||
|
searchable (List[str]): A list of searchable terms for the cluster value.
|
||||||
|
|
||||||
|
Methods:
|
||||||
|
__init__(self, v: Dict[str, Any]): Initializes a ClusterValue object.
|
||||||
|
__init_meta(self, m: Optional[Dict[str, str]]) -> Optional[ClusterValueMeta]: Initializes the metadata for the cluster value.
|
||||||
|
to_json(self) -> str: Converts the ClusterValue object to a JSON string.
|
||||||
|
to_dict(self) -> Dict[str, Any]: Converts the ClusterValue object to a dictionary.
|
||||||
|
"""
|
||||||
|
|
||||||
def __init__(self, v: Dict[str, Any]):
|
def __init__(self, v: Dict[str, Any]):
|
||||||
|
"""
|
||||||
|
Initializes a ClusterValue object.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
v (Dict[str, Any]): A dictionary containing the cluster value information.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
PyMISPGalaxiesError: If the cluster value is invalid (no value).
|
||||||
|
"""
|
||||||
if not v['value']:
|
if not v['value']:
|
||||||
raise PyMISPGalaxiesError("Invalid cluster (no value): {}".format(v))
|
raise PyMISPGalaxiesError("Invalid cluster (no value): {}".format(v))
|
||||||
self.uuid = v.get('uuid', None)
|
self.uuid = v.get('uuid', None)
|
||||||
self.value = v['value']
|
self.value = v['value']
|
||||||
self.description = v.get('description')
|
self.description = v.get('description')
|
||||||
self.meta = self.__init_meta(v.get('meta'))
|
self.meta = self.__init_meta(v.get('meta'))
|
||||||
|
self.related = []
|
||||||
|
try:
|
||||||
|
# LATER convert related to a class?
|
||||||
|
self.related = v['related']
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
self.searchable = [self.value]
|
self.searchable = [self.value]
|
||||||
if self.uuid:
|
if self.uuid:
|
||||||
self.searchable.append(self.uuid)
|
self.searchable.append(self.uuid)
|
||||||
|
@ -189,14 +326,35 @@ class ClusterValue():
|
||||||
self.searchable = list(set(self.searchable))
|
self.searchable = list(set(self.searchable))
|
||||||
|
|
||||||
def __init_meta(self, m: Optional[Dict[str, str]]) -> Optional[ClusterValueMeta]:
|
def __init_meta(self, m: Optional[Dict[str, str]]) -> Optional[ClusterValueMeta]:
|
||||||
|
"""
|
||||||
|
Initializes the metadata for the cluster value.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
m (Optional[Dict[str, str]]): A dictionary containing the metadata for the cluster value.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Optional[ClusterValueMeta]: The initialized ClusterValueMeta object or None if no metadata is provided.
|
||||||
|
"""
|
||||||
if not m:
|
if not m:
|
||||||
return None
|
return None
|
||||||
return ClusterValueMeta(m)
|
return ClusterValueMeta(m)
|
||||||
|
|
||||||
def to_json(self) -> str:
|
def to_json(self) -> str:
|
||||||
|
"""
|
||||||
|
Converts the ClusterValue object to a JSON string.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: The JSON representation of the ClusterValue object.
|
||||||
|
"""
|
||||||
return json.dumps(self, cls=EncodeClusters)
|
return json.dumps(self, cls=EncodeClusters)
|
||||||
|
|
||||||
def to_dict(self) -> Dict[str, Any]:
|
def to_dict(self) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Converts the ClusterValue object to a dictionary.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dict[str, Any]: The dictionary representation of the ClusterValue object.
|
||||||
|
"""
|
||||||
to_return = {'value': self.value}
|
to_return = {'value': self.value}
|
||||||
if self.uuid:
|
if self.uuid:
|
||||||
to_return['uuid'] = self.uuid
|
to_return['uuid'] = self.uuid
|
||||||
|
@ -204,12 +362,58 @@ class ClusterValue():
|
||||||
to_return['description'] = self.description
|
to_return['description'] = self.description
|
||||||
if self.meta:
|
if self.meta:
|
||||||
to_return['meta'] = self.meta
|
to_return['meta'] = self.meta
|
||||||
|
if self.related:
|
||||||
|
to_return['related'] = self.related
|
||||||
return to_return
|
return to_return
|
||||||
|
|
||||||
|
|
||||||
class Cluster(Mapping): # type: ignore
|
class Cluster(Mapping): # type: ignore
|
||||||
|
"""
|
||||||
|
Represents a cluster in the PyMISPGalaxies library.
|
||||||
|
|
||||||
def __init__(self, cluster: Dict[str, Any], skip_duplicates: bool=False):
|
Attributes:
|
||||||
|
cluster (Dict[str, Any]): The dictionary containing the cluster data.
|
||||||
|
cluster (str): The name of the existing cluster to load from the data folder.
|
||||||
|
name (str): The name of the cluster.
|
||||||
|
type (str): The type of the cluster.
|
||||||
|
source (str): The source of the cluster.
|
||||||
|
authors (str): The authors of the cluster.
|
||||||
|
description (str): The description of the cluster.
|
||||||
|
uuid (str): The UUID of the cluster.
|
||||||
|
version (str): The version of the cluster.
|
||||||
|
category (str): The category of the cluster.
|
||||||
|
cluster_values (Dict[str, ClusterValue]): A dictionary containing the cluster values, where the keys are the values of the cluster and the values are instances of the ClusterValue class.
|
||||||
|
duplicates (List[Tuple[str, str]]): A list of tuples representing duplicate values in the cluster, where each tuple contains the name of the cluster and the duplicate value.
|
||||||
|
|
||||||
|
Methods:
|
||||||
|
__init__(self, cluster: Dict[str, Any] | str, skip_duplicates: bool = False): Initializes a Cluster object from a dict or existing cluster file
|
||||||
|
search(self, query: str, return_tags: bool = False) -> Union[List[ClusterValue], List[str]]: Searches for values in the cluster that match the given query.
|
||||||
|
machinetags(self) -> List[str]: Returns a list of machine tags for the cluster.
|
||||||
|
get_by_external_id(self, external_id: str) -> ClusterValue: Returns the cluster value with the specified external ID.
|
||||||
|
save(self, name:str) -> None: Saves the cluster to a file <name>.json
|
||||||
|
__str__(self) -> str: Returns a string representation of the cluster.
|
||||||
|
__getitem__(self, name: str) -> ClusterValue: Returns the cluster value with the specified name.
|
||||||
|
__len__(self) -> int: Returns the number of cluster values in the cluster.
|
||||||
|
__iter__(self) -> Iterator[str]: Returns an iterator over the cluster values.
|
||||||
|
to_json(self) -> str: Converts the Cluster object to a JSON string.
|
||||||
|
to_dict(self) -> Dict[str, Any]: Converts the Cluster object to a dictionary.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, cluster: Union[Dict[str, Any], str], skip_duplicates: bool = False):
|
||||||
|
"""
|
||||||
|
Initializes a Cluster object from an existing cluster.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
cluster (str): The name of the existing cluster to load from the data folder.
|
||||||
|
cluster (Dict[str, Any]): A dictionary containing the cluster data.
|
||||||
|
skip_duplicates (bool, optional): Flag indicating whether to skip duplicate values. Defaults to False.
|
||||||
|
"""
|
||||||
|
if isinstance(cluster, str):
|
||||||
|
root_dir_clusters = os.path.join(os.path.abspath(os.path.dirname(sys.modules['pymispgalaxies'].__file__)), 'data', 'misp-galaxy', 'clusters') # type: ignore [type-var, arg-type]
|
||||||
|
cluster_file = os.path.join(root_dir_clusters, f"{cluster}.json")
|
||||||
|
with open(cluster_file, 'r') as f:
|
||||||
|
self.cluster = json.load(f)
|
||||||
|
else:
|
||||||
self.cluster = cluster
|
self.cluster = cluster
|
||||||
self.name = self.cluster['name']
|
self.name = self.cluster['name']
|
||||||
self.type = self.cluster['type']
|
self.type = self.cluster['type']
|
||||||
|
@ -219,19 +423,17 @@ class Cluster(Mapping): # type: ignore
|
||||||
self.uuid = self.cluster['uuid']
|
self.uuid = self.cluster['uuid']
|
||||||
self.version = self.cluster['version']
|
self.version = self.cluster['version']
|
||||||
self.category = self.cluster['category']
|
self.category = self.cluster['category']
|
||||||
self.cluster_values = {}
|
self.cluster_values: Dict[str, Any] = {}
|
||||||
self.duplicates = []
|
self.duplicates: List[Tuple[str, str]] = []
|
||||||
|
try:
|
||||||
for value in self.cluster['values']:
|
for value in self.cluster['values']:
|
||||||
new_cluster_value = ClusterValue(value)
|
new_cluster_value = ClusterValue(value)
|
||||||
if self.get(new_cluster_value.value):
|
self.append(new_cluster_value, skip_duplicates)
|
||||||
if skip_duplicates:
|
except KeyError:
|
||||||
self.duplicates.append((self.name, new_cluster_value.value))
|
pass
|
||||||
else:
|
|
||||||
raise PyMISPGalaxiesError("Duplicate value ({}) in cluster: {}".format(new_cluster_value.value, self.name))
|
|
||||||
self.cluster_values[new_cluster_value.value] = new_cluster_value
|
|
||||||
|
|
||||||
@overload
|
@overload
|
||||||
def search(self, query: str, return_tags: Literal[False]=False) -> List[ClusterValue]:
|
def search(self, query: str, return_tags: Literal[False] = False) -> List[ClusterValue]:
|
||||||
...
|
...
|
||||||
|
|
||||||
@overload
|
@overload
|
||||||
|
@ -242,39 +444,160 @@ class Cluster(Mapping): # type: ignore
|
||||||
def search(self, query: str, return_tags: bool) -> Union[List[ClusterValue], List[str]]:
|
def search(self, query: str, return_tags: bool) -> Union[List[ClusterValue], List[str]]:
|
||||||
...
|
...
|
||||||
|
|
||||||
def search(self, query: str, return_tags: bool=False) -> Union[List[ClusterValue], List[str]]:
|
def search(self, query: str, return_tags: bool = False) -> Union[List[ClusterValue], List[str]]:
|
||||||
|
"""
|
||||||
|
Searches for values in the cluster that match the given query.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
query (str): The query to search for.
|
||||||
|
return_tags (bool, optional): Flag indicating whether to return machine tags instead of cluster values. Defaults to False.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Union[List[ClusterValue], List[str]]: A list of matching cluster values or machine tags.
|
||||||
|
"""
|
||||||
matching = []
|
matching = []
|
||||||
for v in self.values():
|
for v in self.values():
|
||||||
if [s for s in v.searchable if query.lower() in s.lower()]:
|
if [s for s in v.searchable if query.lower() in s.lower()]:
|
||||||
if return_tags:
|
if return_tags:
|
||||||
matching.append('misp-galaxy:{}="{}"'.format(self.type, v.value))
|
matching.append('misp-galaxy:{}="{}"'.format(self.type, v.value))
|
||||||
pass
|
|
||||||
else:
|
else:
|
||||||
matching.append(v)
|
matching.append(v)
|
||||||
return matching
|
return matching
|
||||||
|
|
||||||
def machinetags(self) -> List[str]:
|
def machinetags(self) -> List[str]:
|
||||||
|
"""
|
||||||
|
Returns a list of machine tags for the cluster.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List[str]: A list of machine tags.
|
||||||
|
"""
|
||||||
to_return = []
|
to_return = []
|
||||||
for v in self.values():
|
for v in self.values():
|
||||||
to_return.append('misp-galaxy:{}="{}"'.format(self.type, v.value))
|
to_return.append('misp-galaxy:{}="{}"'.format(self.type, v.value))
|
||||||
return to_return
|
return to_return
|
||||||
|
|
||||||
|
def get_by_external_id(self, external_id: str) -> ClusterValue:
|
||||||
|
"""
|
||||||
|
Returns the cluster value with the specified external ID.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
external_id (str): The external ID to search for.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
ClusterValue: The cluster value with the specified external ID.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
KeyError: If no value with the specified external ID is found.
|
||||||
|
"""
|
||||||
|
for value in self.cluster_values.values():
|
||||||
|
if value.meta and value.meta.additional_properties and value.meta.additional_properties.get('external_id') == external_id:
|
||||||
|
return value
|
||||||
|
raise KeyError('No value with external_id: {}'.format(external_id))
|
||||||
|
|
||||||
|
def get_kill_chain_tactics(self) -> Dict[str, List[str]]:
|
||||||
|
"""
|
||||||
|
Returns the sorted kill chain tactics associated with the cluster.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List[str]: A list of kill chain tactics.
|
||||||
|
"""
|
||||||
|
items = set()
|
||||||
|
for v in self.cluster_values.values():
|
||||||
|
if v.meta and v.meta.additional_properties and v.meta.additional_properties.get('kill_chain'):
|
||||||
|
for item in v.meta.additional_properties.get('kill_chain'):
|
||||||
|
items.add(item)
|
||||||
|
result: Dict[str, List[str]] = {}
|
||||||
|
for item in items:
|
||||||
|
key, value = item.split(':')
|
||||||
|
if key not in result:
|
||||||
|
result[key] = []
|
||||||
|
result[key].append(value)
|
||||||
|
|
||||||
|
for key in result.keys():
|
||||||
|
result[key] = sorted(result[key])
|
||||||
|
return result
|
||||||
|
|
||||||
|
def append(self, cv: Union[Dict[str, Any], ClusterValue], skip_duplicates: bool = False) -> None:
|
||||||
|
"""
|
||||||
|
Adds a cluster value to the cluster.
|
||||||
|
"""
|
||||||
|
if isinstance(cv, dict):
|
||||||
|
cv = ClusterValue(cv)
|
||||||
|
if self.get(cv.value):
|
||||||
|
if skip_duplicates:
|
||||||
|
self.duplicates.append((self.name, cv.value))
|
||||||
|
else:
|
||||||
|
raise PyMISPGalaxiesError("Duplicate value ({}) in cluster: {}".format(cv.value, self.name))
|
||||||
|
self.cluster_values[cv.value] = cv
|
||||||
|
|
||||||
|
def save(self, name: str) -> None:
|
||||||
|
"""
|
||||||
|
Saves the cluster to a file <name>.json
|
||||||
|
|
||||||
|
Args:
|
||||||
|
name (str): The name of the file to save the cluster to.
|
||||||
|
"""
|
||||||
|
root_dir_clusters = os.path.join(os.path.abspath(os.path.dirname(sys.modules['pymispgalaxies'].__file__)), 'data', 'misp-galaxy', 'clusters') # type: ignore [type-var, arg-type]
|
||||||
|
cluster_file = os.path.join(root_dir_clusters, f"{name}.json")
|
||||||
|
with open(cluster_file, 'w') as f:
|
||||||
|
json.dump(self, f, cls=EncodeClusters, indent=2, sort_keys=True, ensure_ascii=False)
|
||||||
|
f.write('\n') # needed for the beauty and to be compliant with jq_all_the_things
|
||||||
|
|
||||||
def __str__(self) -> str:
|
def __str__(self) -> str:
|
||||||
|
"""
|
||||||
|
Returns a string representation of the cluster.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: A string representation of the cluster.
|
||||||
|
"""
|
||||||
return '\n'.join(self.machinetags())
|
return '\n'.join(self.machinetags())
|
||||||
|
|
||||||
def __getitem__(self, name: str) -> ClusterValue:
|
def __getitem__(self, name: str) -> ClusterValue:
|
||||||
|
"""
|
||||||
|
Returns the cluster value with the specified name.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
name (str): The name of the cluster value.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
ClusterValue: The cluster value with the specified name.
|
||||||
|
"""
|
||||||
return self.cluster_values[name]
|
return self.cluster_values[name]
|
||||||
|
|
||||||
def __len__(self) -> int:
|
def __len__(self) -> int:
|
||||||
|
"""
|
||||||
|
Returns the number of cluster values in the cluster.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
int: The number of cluster values.
|
||||||
|
"""
|
||||||
return len(self.cluster_values)
|
return len(self.cluster_values)
|
||||||
|
|
||||||
def __iter__(self) -> Iterator[str]:
|
def __iter__(self) -> Iterator[str]:
|
||||||
|
"""
|
||||||
|
Returns an iterator over the cluster values.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Iterator[str]: An iterator over the cluster values.
|
||||||
|
"""
|
||||||
return iter(self.cluster_values)
|
return iter(self.cluster_values)
|
||||||
|
|
||||||
def to_json(self) -> str:
|
def to_json(self) -> str:
|
||||||
|
"""
|
||||||
|
Converts the Cluster object to a JSON string.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: The JSON representation of the Cluster object.
|
||||||
|
"""
|
||||||
return json.dumps(self, cls=EncodeClusters)
|
return json.dumps(self, cls=EncodeClusters)
|
||||||
|
|
||||||
def to_dict(self) -> Dict[str, Any]:
|
def to_dict(self) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Converts the Cluster object to a dictionary.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dict[str, Any]: The dictionary representation of the Cluster object.
|
||||||
|
"""
|
||||||
to_return = {'name': self.name, 'type': self.type, 'source': self.source,
|
to_return = {'name': self.name, 'type': self.type, 'source': self.source,
|
||||||
'authors': self.authors, 'description': self.description,
|
'authors': self.authors, 'description': self.description,
|
||||||
'uuid': self.uuid, 'version': self.version, 'category': self.category,
|
'uuid': self.uuid, 'version': self.version, 'category': self.category,
|
||||||
|
@ -285,7 +608,14 @@ class Cluster(Mapping): # type: ignore
|
||||||
|
|
||||||
class Clusters(Mapping): # type: ignore
|
class Clusters(Mapping): # type: ignore
|
||||||
|
|
||||||
def __init__(self, clusters: List[Dict[str, str]]=[], skip_duplicates: bool=False):
|
def __init__(self, clusters: List[Dict[str, str]] = [], skip_duplicates: bool = False):
|
||||||
|
"""
|
||||||
|
Allows to interact with a group of clusters.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
clusters (List[Dict[str, str]], optional): A list of dictionaries representing clusters. If left empty, load the clusters from the data folder.
|
||||||
|
skip_duplicates (bool, optional): Flag indicating whether to skip duplicate clusters. Defaults to False.
|
||||||
|
"""
|
||||||
if not clusters:
|
if not clusters:
|
||||||
clusters = []
|
clusters = []
|
||||||
self.root_dir_clusters = os.path.join(os.path.abspath(os.path.dirname(sys.modules['pymispgalaxies'].__file__)), # type: ignore
|
self.root_dir_clusters = os.path.join(os.path.abspath(os.path.dirname(sys.modules['pymispgalaxies'].__file__)), # type: ignore
|
||||||
|
@ -298,6 +628,12 @@ class Clusters(Mapping): # type: ignore
|
||||||
self.clusters[cluster['type']] = Cluster(cluster, skip_duplicates=skip_duplicates)
|
self.clusters[cluster['type']] = Cluster(cluster, skip_duplicates=skip_duplicates)
|
||||||
|
|
||||||
def validate_with_schema(self) -> None:
|
def validate_with_schema(self) -> None:
|
||||||
|
"""
|
||||||
|
Validates the clusters against the schema.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ImportError: If jsonschema is not installed.
|
||||||
|
"""
|
||||||
if not HAS_JSONSCHEMA:
|
if not HAS_JSONSCHEMA:
|
||||||
raise ImportError('jsonschema is required: pip install jsonschema')
|
raise ImportError('jsonschema is required: pip install jsonschema')
|
||||||
schema = os.path.join(os.path.abspath(os.path.dirname(sys.modules['pymispgalaxies'].__file__)), # type: ignore
|
schema = os.path.join(os.path.abspath(os.path.dirname(sys.modules['pymispgalaxies'].__file__)), # type: ignore
|
||||||
|
@ -308,9 +644,27 @@ class Clusters(Mapping): # type: ignore
|
||||||
jsonschema.validate(c.cluster, loaded_schema)
|
jsonschema.validate(c.cluster, loaded_schema)
|
||||||
|
|
||||||
def all_machinetags(self) -> List[str]:
|
def all_machinetags(self) -> List[str]:
|
||||||
|
"""
|
||||||
|
Returns a list of all machinetags in the clusters.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List[str]: A list of machinetags.
|
||||||
|
"""
|
||||||
return [cluster.machinetags() for cluster in self.values()]
|
return [cluster.machinetags() for cluster in self.values()]
|
||||||
|
|
||||||
def revert_machinetag(self, machinetag: str) -> Tuple[Cluster, ClusterValue]:
|
def revert_machinetag(self, machinetag: str) -> Tuple[Cluster, ClusterValue]:
|
||||||
|
"""
|
||||||
|
Reverts a machinetag to its original cluster and value.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
machinetag (str): The machinetag to revert.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple[Cluster, ClusterValue]: A tuple containing the original cluster and value.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
UnableToRevertMachinetag: If the machinetag could not be found.
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
_, cluster_type, cluster_value = re.findall('^([^:]*):([^=]*)="([^"]*)"$', machinetag)[0]
|
_, cluster_type, cluster_value = re.findall('^([^:]*):([^=]*)="([^"]*)"$', machinetag)[0]
|
||||||
cluster: Cluster = self[cluster_type]
|
cluster: Cluster = self[cluster_type]
|
||||||
|
@ -319,7 +673,18 @@ class Clusters(Mapping): # type: ignore
|
||||||
except Exception:
|
except Exception:
|
||||||
raise UnableToRevertMachinetag('The machinetag {} could not be found.'.format(machinetag))
|
raise UnableToRevertMachinetag('The machinetag {} could not be found.'.format(machinetag))
|
||||||
|
|
||||||
def search(self, query: str, return_tags: bool=False) -> List[Tuple[Cluster, str]]:
|
def search(self, query: str, return_tags: bool = False) -> List[Tuple[Cluster, str]]:
|
||||||
|
"""
|
||||||
|
Searches for clusters and values matching the given query.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
query (str): The query to search for.
|
||||||
|
return_tags (bool, optional): Flag indicating whether to return the matching tags. Defaults to False.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List[Tuple[Cluster, str]]: A list of tuples containing the matching cluster and value.
|
||||||
|
|
||||||
|
"""
|
||||||
to_return = []
|
to_return = []
|
||||||
for cluster in self.values():
|
for cluster in self.values():
|
||||||
values = cluster.search(query, return_tags)
|
values = cluster.search(query, return_tags)
|
||||||
|
@ -329,15 +694,45 @@ class Clusters(Mapping): # type: ignore
|
||||||
return to_return
|
return to_return
|
||||||
|
|
||||||
def __getitem__(self, name: str) -> Cluster:
|
def __getitem__(self, name: str) -> Cluster:
|
||||||
|
"""
|
||||||
|
Returns the cluster with the specified name.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
name (str): The name of the cluster.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Cluster: The cluster object.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
KeyError: If the cluster with the specified name does not exist.
|
||||||
|
"""
|
||||||
return self.clusters[name]
|
return self.clusters[name]
|
||||||
|
|
||||||
def __iter__(self) -> Iterator[str]:
|
def __iter__(self) -> Iterator[str]:
|
||||||
|
"""
|
||||||
|
Returns an iterator over the cluster names.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Iterator[str]: An iterator over the cluster names.
|
||||||
|
"""
|
||||||
return iter(self.clusters)
|
return iter(self.clusters)
|
||||||
|
|
||||||
def __len__(self) -> int:
|
def __len__(self) -> int:
|
||||||
|
"""
|
||||||
|
Returns the number of clusters.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
int: The number of clusters.
|
||||||
|
"""
|
||||||
return len(self.clusters)
|
return len(self.clusters)
|
||||||
|
|
||||||
def __str__(self) -> str:
|
def __str__(self) -> str:
|
||||||
|
"""
|
||||||
|
Returns a string representation of the Clusters object.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: A string representation of the Clusters object.
|
||||||
|
"""
|
||||||
to_print = ''
|
to_print = ''
|
||||||
for cluster in self.values():
|
for cluster in self.values():
|
||||||
to_print += '{}\n\n'.format(cluster)
|
to_print += '{}\n\n'.format(cluster)
|
||||||
|
|
|
@ -0,0 +1,23 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
import unittest
|
||||||
|
from pymispgalaxies import Galaxies, Clusters, Cluster
|
||||||
|
|
||||||
|
|
||||||
|
class TestPyMISPGalaxiesApi(unittest.TestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.galaxies = Galaxies()
|
||||||
|
self.clusters = Clusters(skip_duplicates=False)
|
||||||
|
self.maxDiff = None
|
||||||
|
|
||||||
|
def test_get_by_external_id(self):
|
||||||
|
cluster = Cluster(cluster='mitre-attack-pattern')
|
||||||
|
self.assertIsNotNone(cluster)
|
||||||
|
cluster_by_external_id = cluster.get_by_external_id('T1525')
|
||||||
|
cluster_by_value = cluster.get('Implant Internal Image - T1525')
|
||||||
|
self.assertEqual(cluster_by_external_id, cluster_by_value)
|
||||||
|
|
||||||
|
with self.assertRaises(KeyError):
|
||||||
|
cluster.get_by_external_id('XXXXXX')
|
Loading…
Reference in New Issue