Merge branch 'main' of github.com:MISP/misp-modules

pull/693/head
Christian Studer 2024-01-30 15:03:18 +01:00
commit feb9c63ca7
No known key found for this signature in database
GPG Key ID: 6BBED1B63A6D639F
50 changed files with 2118 additions and 292 deletions

View File

@ -13,22 +13,18 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.7", "3.8", "3.9", "3.10", "3.11"]
python-version: ["3.8", "3.9", "3.10", "3.11"]
steps:
- name: Install packages
run: |
sudo apt-get install libpoppler-cpp-dev libzbar0 tesseract-ocr
- uses: actions/checkout@v2
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Cache Python dependencies
uses: actions/cache@v2
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip-${{ matrix.python-version }}-${{ hashFiles('REQUIREMENTS') }}
cache: 'pip'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
@ -42,12 +38,17 @@ jobs:
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
- name: Run server in background
run: |
misp-modules -l 127.0.0.1 -s 2>error.log &
sleep 3
- name: Check if server is running
run: |
curl -sS localhost:6666/healthcheck
- name: Test with pytest
run: |
# Run server in background
misp-modules -l 127.0.0.1 -s &
sleep 5
# Check if modules are running
curl -sS localhost:6666/modules
# Run tests
pytest tests
- name: Show error log
if: always()
run: |
cat error.log

View File

@ -13,8 +13,7 @@ For more information: [Extending MISP with Python modules](https://www.misp-proj
## Existing MISP modules
### Expansion modules
* [apiosintDS](misp_modules/modules/expansion/apiosintds.py) - a hover and expansion module to query the OSINT.digitalside.it API.
* [apiosintDS](misp_modules/modules/expansion/apiosintds.py) - a hover and expansion module to query the [OSINT.digitalside.it](https://osint.digitalside.it) API. [Documentation](https://apiosintds.readthedocs.io/en/latest/userguidemisp.html).
* [API Void](misp_modules/modules/expansion/apivoid.py) - an expansion and hover module to query API Void with a domain attribute.
* [AssemblyLine submit](misp_modules/modules/expansion/assemblyline_submit.py) - an expansion module to submit samples and urls to AssemblyLine.
* [AssemblyLine query](misp_modules/modules/expansion/assemblyline_query.py) - an expansion module to query AssemblyLine and parse the full submission report.
@ -50,6 +49,7 @@ For more information: [Extending MISP with Python modules](https://www.misp-proj
* [html_to_markdown](misp_modules/modules/expansion/html_to_markdown.py) - Simple HTML to markdown converter
* [HYAS Insight](misp_modules/modules/expansion/hyasinsight.py) - a hover and expansion module to get information from [HYAS Insight](https://www.hyas.com/hyas-insight).
* [intel471](misp_modules/modules/expansion/intel471.py) - an expansion module to get info from [Intel471](https://intel471.com).
* [IP2Location.io](misp_modules/modules/expansion/ip2locationio.py) - an expansion module to get additional information on an IP address using the IP2Location.io API
* [IPASN](misp_modules/modules/expansion/ipasn.py) - a hover and expansion to get the BGP ASN of an IP address.
* [ipinfo.io](misp_modules/modules/expansion/ipinfo.py) - an expansion module to get additional information on an IP address using the ipinfo.io API
* [iprep](misp_modules/modules/expansion/iprep.py) - an expansion module to get IP reputation from packetmail.net.
@ -95,7 +95,9 @@ For more information: [Extending MISP with Python modules](https://www.misp-proj
* [VMware NSX](misp_modules/modules/expansion/vmware_nsx.py) - a module to enrich a file or URL with VMware NSX Defender.
* [VulnDB](misp_modules/modules/expansion/vulndb.py) - a module to query [VulnDB](https://www.riskbasedsecurity.com/).
* [Vulners](misp_modules/modules/expansion/vulners.py) - an expansion module to expand information about CVEs using Vulners API.
* [Vysion](misp_modules/modules/expansion/vysion.py) - an expansion module to add dark web intelligence using Vysion API.
* [whois](misp_modules/modules/expansion/whois.py) - a module to query a local instance of [uwhois](https://github.com/rafiot/uwhoisd).
* [whoisfreaks](misp_modules/modules/expansion/whoisfreaks.py) - An expansion module for [whoisfreaks](https://whoisfreaks.com/) that will provide an enriched analysis of the provided domain, including WHOIS and DNS information.
* [wikidata](misp_modules/modules/expansion/wiki.py) - a [wikidata](https://www.wikidata.org) expansion module.
* [xforce](misp_modules/modules/expansion/xforceexchange.py) - an IBM X-Force Exchange expansion module.
* [xlsx-enrich](misp_modules/modules/expansion/xlsx_enrich.py) - an enrichment module to get text out of an Excel document into MISP (using free-text parser).

View File

@ -3,7 +3,7 @@ aiohttp==3.8.4
aiosignal==1.3.1 ; python_version >= '3.7'
antlr4-python3-runtime==4.9.3
anyio==3.6.2 ; python_full_version >= '3.6.2'
apiosintds==1.8.3
git+https://github.com/davidonzo/apiosintDS@misp
appdirs==1.4.4
argcomplete==3.0.8 ; python_version >= '3.6'
argparse==1.4.0
@ -14,7 +14,7 @@ attrs==23.1.0 ; python_version >= '3.7'
backoff==2.2.1 ; python_version >= '3.7' and python_version < '4.0'
backports.zoneinfo==0.2.1 ; python_version < '3.9'
backscatter==0.2.4
beautifulsoup4==4.11.2
beautifulsoup4==4.12.2
bidict==0.22.1 ; python_version >= '3.7'
blockchain==1.4.4
censys==2.2.2
@ -33,7 +33,7 @@ crowdstrike-falconpy==1.2.15
cryptography==40.0.2 ; python_version >= '3.6'
dateparser==1.1.8 ; python_version >= '3.7'
decorator==5.1.1 ; python_version >= '3.5'
deprecated==1.2.13 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'
deprecated==1.2.14 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'
dnsdb2==1.1.4
dnspython==2.3.0
domaintools-api==1.0.1
@ -41,7 +41,7 @@ easygui==0.98.3
ebcdic==1.1.1
enum-compat==0.0.3
et-xmlfile==1.1.0 ; python_version >= '3.6'
extract-msg==0.38.4
extract-msg==0.45.0
ezodf==0.3.2
filelock==3.12.0 ; python_version >= '3.7'
frozenlist==1.3.3 ; python_version >= '3.7'
@ -62,15 +62,17 @@ jbxapi==3.21.0
jeepney==0.8.0 ; sys_platform == 'linux'
jinja2==3.1.2
json-log-formatter==0.5.2 ; python_version >= '2.7'
jsonschema==4.17.3 ; python_version >= '3.7'
jsonschema==4.19.0 ; python_version >= '3.7'
keyring==23.13.1 ; python_version >= '3.7'
lark-parser==0.12.0
lief==0.12.3
lief==0.13.2
lxml==4.9.2
maclookup==1.0.3
markdown-it-py==2.2.0 ; python_version >= '3.7'
markdownify==0.5.3
markupsafe==2.1.2 ; python_version >= '3.7'
matplotlib==3.7.2 ; python_version >= '3.8'
matplotlib==3.5.3 ; python_version == '3.7'
mattermostdriver==7.3.2
maxminddb==2.3.0 ; python_version >= '3.7'
mdurl==0.1.2 ; python_version >= '3.7'
@ -89,8 +91,8 @@ oletools==0.60.1
opencv-python==4.7.0.72
openpyxl==3.1.2
packaging==23.1 ; python_version >= '3.7'
pandas==1.3.5
pandas-ods-reader==0.1.2
pandas==1.5.3
pandas-ods-reader==0.1.4
passivetotal==2.5.9
pcodedmp==1.2.6
pdftotext==2.2.2
@ -98,7 +100,7 @@ pillow==9.5.0
pkgutil-resolve-name==1.3.10 ; python_version < '3.9'
progressbar2==4.2.0 ; python_full_version >= '3.7.0'
psutil==5.9.5 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'
publicsuffixlist==0.9.4 ; python_version >= '2.6'
publicsuffixlist==0.10.0.20230828 ; python_version >= '2.6'
git+https://github.com/D4-project/BGP-Ranking.git/@68de39f6c5196f796055c1ac34504054d688aa59#egg=pybgpranking&subdirectory=client
pycountry==22.3.5
pycparser==2.21
@ -112,12 +114,13 @@ pygeoip==0.3.2
pygments==2.15.1 ; python_version >= '3.7'
git+https://github.com/MISP/PyIntel471.git@917272fafa8e12102329faca52173e90c5256968#egg=pyintel471
git+https://github.com/D4-project/IPASN-History.git/@a2853c39265cecdd0c0d16850bd34621c0551b87#egg=pyipasnhistory&subdirectory=client
pymisp[email,fileobjects,openioc,pdfexport,url]==2.4.167
pymisp[email,fileobjects,openioc,pdfexport,url]==2.4.175
git+https://github.com/sebdraven/pyonyphe@d1d6741f8ea4475f3bb77ff20c876f08839cabd1#egg=pyonyphe
pyparsing==2.4.7 ; python_version >= '2.6' and python_version not in '3.0, 3.1, 3.2, 3.3'
pypdns==1.5.2
pypssl==2.2
pyrsistent==0.19.3 ; python_version >= '3.7'
pysafebrowsing==0.1.2
pytesseract==0.3.10
python-baseconv==1.2.2
python-dateutil==2.8.2 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'
@ -127,7 +130,7 @@ python-magic==0.4.27
python-pptx==0.6.21
python-socketio[client]==5.8.0 ; python_version >= '3.6'
python-utils==3.5.2 ; python_version >= '3.7'
pytz==2019.3
pytz==2023.3
pytz-deprecation-shim==0.1.0.post0 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4, 3.5'
pyyaml==6.0 ; python_version >= '3.6'
pyzbar==0.1.9
@ -136,16 +139,17 @@ rdflib==6.3.2 ; python_version >= '3.7' and python_version < '4.0'
red-black-tree-mod==1.20
redis==4.5.5 ; python_version >= '3.7'
regex==2023.5.5 ; python_version >= '3.6'
reportlab==3.6.13
requests[security]==2.30.0
reportlab==4.0.4
requests[security]==2.31.0
requests-cache==0.6.4 ; python_version >= '3.6'
requests-file==1.5.1
rich==13.3.5 ; python_full_version >= '3.7.0'
rtfde==0.0.2
rtfde==0.1.0
secretstorage==3.3.3 ; sys_platform == 'linux'
setuptools==67.7.2 ; python_version >= '3.7'
shodan==1.29.1
sigmatools==0.19.1
sigmf==1.1.1
simplejson==3.19.1 ; python_version >= '2.5' and python_version not in '3.0, 3.1, 3.2, 3.3'
six==1.16.0 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'
sniffio==1.3.0 ; python_version >= '3.7'
@ -173,6 +177,7 @@ validators==0.14.0
vt-graph-api==2.2.0
vt-py==0.17.5
vulners==2.0.10
vysion==1.0.10
wand==0.6.11
websocket-client==1.5.1 ; python_version >= '3.7'
websockets==11.0.3 ; python_version >= '3.7'

View File

@ -75,6 +75,7 @@ For more information: [Extending MISP with Python modules](https://www.circl.lu/
* [VMray](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/vmray_submit.py) - a module to submit a sample to VMray.
* [VulnDB](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/vulndb.py) - a module to query [VulnDB](https://www.riskbasedsecurity.com/).
* [Vulners](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/vulners.py) - an expansion module to expand information about CVEs using Vulners API.
* [Vysion](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/vysion.py) - an expansion module to add dark web intelligence using Vysion API.
* [whois](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/whois.py) - a module to query a local instance of [uwhois](https://github.com/rafiot/uwhoisd).
* [wikidata](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/wiki.py) - a [wikidata](https://www.wikidata.org) expansion module.
* [xforce](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/xforceexchange.py) - an IBM X-Force Exchange expansion module.

BIN
docs/logos/cluster25.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.6 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.0 KiB

BIN
docs/logos/vysion.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 191 KiB

View File

@ -58,7 +58,7 @@ A module tu query the AssemblyLine API with a submission ID to get the submissio
- **output**:
>MISP attributes & objects parsed from the AssemblyLine submission.
- **references**:
>https://www.cyber.cg.ca/en/assemblyline
>https://www.cyber.gc.ca/en/assemblyline
- **requirements**:
>assemblyline_client: Python library to query the AssemblyLine rest API.
@ -207,6 +207,39 @@ Modules to access CIRCL Passive SSL.
-----
#### [cluster25_expand](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/cluster25_expand.py)
<img src=logos/cluster25.png height=60>
Module to query Cluster25 CTI.
- **features**:
>This module takes a MISP attribute value as input to query the Cluster25CTI API. The result is then mapped into compatible MISP Objects and relative attributes.
>
- **input**:
>An Indicator value of type included in the following list:
>- domain
>- email-src
>- email-dst
>- filename
>- md5
>- sha1
>- sha256
>- ip-src
>- ip-dst
>- url
>- vulnerability
>- btc
>- xmr
> ja3-fingerprint-md5
- **output**:
>A series of c25 MISP Objects with colletion of attributes mapped from Cluster25 CTI query result.
- **references**:
>
- **requirements**:
>A Cluster25 API access (API id & key)
-----
#### [countrycode](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/countrycode.py)
Module to expand country codes.
@ -780,6 +813,28 @@ Module to access intelmqs eventdb.
-----
#### [ip2locationio](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/ip2locationio.py)
<img src=logos/ip2locationio.png height=60>
An expansion module to query IP2Location.io to gather more information on a given IP address.
- **features**:
>The module takes an IP address attribute as input and queries the IP2Location.io API.
>Free plan user will get the basic geolocation informaiton, and different subsription plan will get more information on the IP address.
> Refer to [pricing page](https://www.ip2location.io/pricing) for more information on data available for each plan.
>
>More information on the responses content is available in the [documentation](https://www.ip2location.io/ip2location-documentation).
- **input**:
>IP address attribute.
- **output**:
>Additional information on the IP address, such as geolocation, proxy and so on. Refer to the Response Format section in https://www.ip2location.io/ip2location-documentation to find out the full format of the data returned.
- **references**:
>https://www.ip2location.io/ip2location-documentation
- **requirements**:
>An IP2Location.io token
-----
#### [ipasn](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/ipasn.py)
Module to query an IP ASN history service (https://github.com/D4-project/IPASN-History).
@ -1459,6 +1514,24 @@ An expansion hover module to perform a syntax check on sigma rules.
-----
#### [sigmf-expand](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/sigmf-expand.py)
Enrichs a SigMF Recording or extracts a SigMF Archive into a SigMF Recording.
- **features**:
>This module can be used to expand a SigMF Recording object into a SigMF Expanded Recording object with a waterfall plot or to extract a SigMF Archive object into a SigMF Recording objet.
- **input**:
>Object of sigmf-archive or sigmf-recording template.
- **output**:
>Object of sigmf-expanded-recording or sigmf-recording template.
- **references**:
>https://github.com/sigmf/SigMF
- **requirements**:
> - matplotlib: For plotting the waterfall plot of the recording.
> - numpy: For the waterfall plot of the recording.
> - sigmf: For validating SigMF files.
-----
#### [socialscan](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/socialscan.py)
A hover module to get information on the availability of an email address or username on some online platforms.
@ -1831,6 +1904,26 @@ An expansion hover module to expand information about CVE id using Vulners API.
-----
#### [Vysion](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/vysion.py)
<img src=logos/vysion.png height=60>
Module to enrich the information by making use of the Vysion API.
- **features**:
>This module gets correlated information from our dark web intelligence database. With this you will get several objects containing information related to, for example, an organization victim of a ransomware attack.
>MISP objects containing title, link to our webapp and TOR, i2p or clearnet URLs.
- **input**:
>MISP Attribute which include: company(target-org), country, info.
- **output**:
>MISP objects containing title, link to our webapp and TOR, i2p or clearnet URLs.
- **references**:
>https://vysion.ai/
- **requirements**:
> Vysion python library
> Vysion API Key
-----
#### [whois](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/whois.py)
Module to query a local instance of uwhois (https://github.com/rafiot/uwhoisd).
@ -1847,6 +1940,38 @@ Module to query a local instance of uwhois (https://github.com/rafiot/uwhoisd).
-----
#### [whoisfreaks](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/whoisfreaks.py)
<img src=logos/whoisfreaks.png height=60>
An expansion module for https://whoisfreaks.com/ that will provide an enriched analysis of the provided domain, including WHOIS and DNS information.
Our Whois service, DNS Lookup API, and SSL analysis, equips organizations with comprehensive threat intelligence and attack surface analysis capabilities for enhanced security.
Explore our website's product section at https://whoisfreaks.com/ for a wide range of additional services catering to threat intelligence and attack surface analysis needs.
- **features**:
>The module takes a domain as input and queries the Whoisfreaks API with it.
>
>Some parsing operations are then processed on the result of the query to extract as much information as possible.
>
>After this we map the extracted data to MISP attributes.
- **input**:
>A domain whose Data is required
- **output**:
>MISP attributes resulting from the query on Whoisfreaks API, included in the following list:
>- domain
>- dns-soa-email
>- whois-registrant-email
>- whois-registrant-phone
>- whois-registrant-name
>- whois-registrar
>- whois-creation-date
>- domain
- **references**:
>https://whoisfreaks.com/
- **requirements**:
>An access to the Whoisfreaks API_KEY
-----
#### [wiki](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/wiki.py)
<img src=logos/wikidata.png height=60>

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.6 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.0 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 191 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 32 KiB

View File

@ -55,7 +55,7 @@ A module tu query the AssemblyLine API with a submission ID to get the submissio
- **output**:
>MISP attributes & objects parsed from the AssemblyLine submission.
- **references**:
>https://www.cyber.cg.ca/en/assemblyline
>https://www.cyber.gc.ca/en/assemblyline
- **requirements**:
>assemblyline_client: Python library to query the AssemblyLine rest API.
@ -204,6 +204,39 @@ Modules to access CIRCL Passive SSL.
-----
#### [cluster25_expand](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/cluster25_expand.py)
<img src=../logos/cluster25.png height=60>
Module to query Cluster25 CTI.
- **features**:
>This module takes a MISP attribute value as input to query the Cluster25CTI API. The result is then mapped into compatible MISP Objects and relative attributes.
>
- **input**:
>An Indicator value of type included in the following list:
>- domain
>- email-src
>- email-dst
>- filename
>- md5
>- sha1
>- sha256
>- ip-src
>- ip-dst
>- url
>- vulnerability
>- btc
>- xmr
> ja3-fingerprint-md5
- **output**:
>A series of c25 MISP Objects with colletion of attributes mapped from Cluster25 CTI query result.
- **references**:
>
- **requirements**:
>A Cluster25 API access (API id & key)
-----
#### [countrycode](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/countrycode.py)
Module to expand country codes.
@ -777,6 +810,28 @@ Module to access intelmqs eventdb.
-----
#### [ip2locationio](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/ip2locationio.py)
<img src=../logos/ip2locationio.png height=60>
An expansion module to query IP2Location.io to gather more information on a given IP address.
- **features**:
>The module takes an IP address attribute as input and queries the IP2Location.io API.
>Free plan user will get the basic geolocation informaiton, and different subsription plan will get more information on the IP address.
> Refer to [pricing page](https://www.ip2location.io/pricing) for more information on data available for each plan.
>
>More information on the responses content is available in the [documentation](https://www.ip2location.io/ip2location-documentation).
- **input**:
>IP address attribute.
- **output**:
>Additional information on the IP address, such as geolocation, proxy and so on. Refer to the Response Format section in https://www.ip2location.io/ip2location-documentation to find out the full format of the data returned.
- **references**:
>https://www.ip2location.io/ip2location-documentation
- **requirements**:
>An IP2Location.io token
-----
#### [ipasn](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/ipasn.py)
Module to query an IP ASN history service (https://github.com/D4-project/IPASN-History).
@ -1456,6 +1511,24 @@ An expansion hover module to perform a syntax check on sigma rules.
-----
#### [sigmf-expand](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/sigmf-expand.py)
Enrichs a SigMF Recording or extracts a SigMF Archive into a SigMF Recording.
- **features**:
>This module can be used to expand a SigMF Recording object into a SigMF Expanded Recording object with a waterfall plot or to extract a SigMF Archive object into a SigMF Recording objet.
- **input**:
>Object of sigmf-archive or sigmf-recording template.
- **output**:
>Object of sigmf-expanded-recording or sigmf-recording template.
- **references**:
>https://github.com/sigmf/SigMF
- **requirements**:
> - matplotlib: For plotting the waterfall plot of the recording.
> - numpy: For the waterfall plot of the recording.
> - sigmf: For validating SigMF files.
-----
#### [socialscan](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/socialscan.py)
A hover module to get information on the availability of an email address or username on some online platforms.
@ -1844,6 +1917,38 @@ Module to query a local instance of uwhois (https://github.com/rafiot/uwhoisd).
-----
#### [whoisfreaks](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/whoisfreaks.py)
<img src=../logos/whoisfreaks.png height=60>
An expansion module for https://whoisfreaks.com/ that will provide an enriched analysis of the provided domain, including WHOIS and DNS information.
Our Whois service, DNS Lookup API, and SSL analysis, equips organizations with comprehensive threat intelligence and attack surface analysis capabilities for enhanced security.
Explore our website's product section at https://whoisfreaks.com/ for a wide range of additional services catering to threat intelligence and attack surface analysis needs.
- **features**:
>The module takes a domain as input and queries the Whoisfreaks API with it.
>
>Some parsing operations are then processed on the result of the query to extract as much information as possible.
>
>After this we map the extracted data to MISP attributes.
- **input**:
>A domain whose Data is required
- **output**:
>MISP attributes resulting from the query on Whoisfreaks API, included in the following list:
>- domain
>- dns-soa-email
>- whois-registrant-email
>- whois-registrant-phone
>- whois-registrant-name
>- whois-registrar
>- whois-creation-date
>- domain
- **references**:
>https://whoisfreaks.com/
- **requirements**:
>An access to the Whoisfreaks API_KEY
-----
#### [wiki](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/wiki.py)
<img src=../logos/wikidata.png height=60>

View File

@ -7,7 +7,7 @@
"input": "Link of an AssemblyLine submission report.",
"output": "MISP attributes & objects parsed from the AssemblyLine submission.",
"references": [
"https://www.cyber.cg.ca/en/assemblyline"
"https://www.cyber.gc.ca/en/assemblyline"
],
"features": "The module requires the address of the AssemblyLine server you want to query as well as your credentials used for this instance. Credentials include the used-ID and an API key or the password associated to the user-ID.\n\nThe submission ID extracted from the submission link is then used to query AssemblyLine and get the full submission report. This report is parsed to extract file objects and the associated IPs, domains or URLs the files are connecting to.\n\nSome more data may be parsed in the future."
}
}

View File

@ -0,0 +1,14 @@
{
"description": "Module to query Cluster25 CTI.",
"logo": "cluster25.png",
"requirements": [
"A Cluster25 API access (API id & key)"
],
"input": "An Indicator value of type included in the following list:\n- domain\n- email-src\n- email-dst\n- filename\n- md5\n- sha1\n- sha256\n- ip-src\n- ip-dst\n- url\n- vulnerability\n- btc\n- xmr\n ja3-fingerprint-md5",
"output": "A series of c25 MISP Objects with colletion of attributes mapped from Cluster25 CTI query result.",
"references": [
""
],
"features": "This module takes a MISP attribute value as input to query the Cluster25CTI API. The result is then mapped into compatible MISP Objects and relative attributes.\n"
}

View File

@ -0,0 +1,13 @@
{
"description": "An expansion module to query IP2Location.io to gather more information on a given IP address.",
"logo": "ip2locationio.png",
"requirements": [
"An IP2Location.io token"
],
"input": "IP address attribute.",
"output": "Additional information on the IP address, such as geolocation, proxy and so on. Refer to the Response Format section in https://www.ip2location.io/ip2location-documentation to find out the full format of the data returned.",
"references": [
"https://www.ip2location.io/ip2location-documentation"
],
"features": "The module takes an IP address attribute as input and queries the IP2Location.io API. \nFree plan user will get the basic geolocation informaiton, and different subsription plan will get more information on the IP address. \n Refer to [pricing page](https://www.ip2location.io/pricing) for more information on data available for each plan. \n\nMore information on the responses content is available in the [documentation](https://www.ip2location.io/ip2location-documentation)."
}

View File

@ -0,0 +1,14 @@
{
"description": "Enrichs a SigMF Recording or extracts a SigMF Archive into a SigMF Recording.",
"requirements": [
"matplotlib: For plotting the waterfall plot of the recording.",
"numpy: For the waterfall plot of the recording.",
"sigmf: For validating SigMF files."
],
"input": "Object of sigmf-archive or sigmf-recording template.",
"output": "Object of sigmf-expanded-recording or sigmf-recording template.",
"references": [
"https://github.com/sigmf/SigMF"
],
"features": "This module can be used to expand a SigMF Recording object into a SigMF Expanded Recording object with a waterfall plot or to extract a SigMF Archive object into a SigMF Recording objet."
}

View File

@ -0,0 +1,16 @@
{
"description": "Module to enrich the information by making use of the Vysion API.",
"logo": "vysion.png",
"requirements": [
"Vysion python library",
"Vysion API Key"
],
"input": "MISP Attribute which include: company(target-org), country, info.",
"output": "MISP objects containing title, link to our webapp and TOR, i2p or clearnet URLs.",
"references": [
"https://vysion.ai/",
"https://developers.vysion.ai/",
"https://github.com/ByronLabs/vysion-cti/tree/main"
],
"features": "This module gets correlated information from our dark web intelligence database. With this you will get several objects containing information related to, for example, an organization victim of a ransomware attack."
}

View File

@ -0,0 +1,13 @@
{
"description": "An expansion module for https://whoisfreaks.com/ that will provide an enriched analysis of the provided domain, including WHOIS and DNS information.\nOur Whois service, DNS Lookup API, and SSL analysis, equips organizations with comprehensive threat intelligence and attack surface analysis capabilities for enhanced security. \nExplore our website's product section at https://whoisfreaks.com/ for a wide range of additional services catering to threat intelligence and attack surface analysis needs.",
"logo": "whoisfreaks.png",
"requirements": [
"An access to the Whoisfreaks API_KEY"
],
"input": "A domain whose Data is required",
"output": "MISP attributes resulting from the query on Whoisfreaks API, included in the following list:\n- domain\n- dns-soa-email\n- whois-registrant-email\n- whois-registrant-phone\n- whois-registrant-name\n- whois-registrar\n- whois-creation-date\n- domain",
"references": [
"https://whoisfreaks.com/"
],
"features": "The module takes a domain as input and queries the Whoisfreaks API with it.\n\nSome parsing operations are then processed on the result of the query to extract as much information as possible.\n\nAfter this we map the extracted data to MISP attributes."
}

View File

@ -1,5 +1,4 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Core MISP expansion modules loader and web service
#
@ -23,7 +22,6 @@ import os
import signal
import sys
import importlib
import json
import logging
import fnmatch
import argparse
@ -31,11 +29,17 @@ import re
import datetime
import psutil
try:
import orjson as json
except ImportError:
import json
import tornado.web
import tornado.process
from tornado.ioloop import IOLoop
from tornado.concurrent import run_on_executor
from concurrent.futures import ThreadPoolExecutor
from pymisp import pymisp_json_default
try:
from .modules import * # noqa
@ -58,18 +62,21 @@ def handle_signal(sig, frame):
IOLoop.instance().add_callback_from_signal(IOLoop.instance().stop)
def init_logger(level=False):
def init_logger(debug=False):
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler = logging.StreamHandler(stream=sys.stdout)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
handler.setLevel(logging.INFO)
if level:
handler.setLevel(logging.DEBUG)
# Enable access logs
access_log = logging.getLogger('tornado.access')
access_log.propagate = False
access_log.setLevel(logging.INFO)
access_log.addHandler(handler)
# Set application log
log.addHandler(handler)
log.setLevel(logging.INFO)
if level:
log.setLevel(logging.DEBUG)
return log
log.propagate = False
log.setLevel(logging.DEBUG if debug else logging.INFO)
def load_helpers(helpersdir):
@ -89,28 +96,28 @@ def load_helpers(helpersdir):
selftest = hhandlers[helpername].selftest()
if selftest is None:
helpers.append(helpername)
log.info('Helpers loaded {} '.format(filename))
log.info(f'Helpers loaded {filename}')
else:
log.info('Helpers failed {} due to {}'.format(filename, selftest))
log.warning(f'Helpers failed {filename} due to {selftest}')
def load_package_helpers():
if not HAS_PACKAGE_HELPERS:
log.info('Unable to load MISP helpers from package.')
sys.exit()
log.error('Unable to load MISP helpers from package.')
sys.exit(1)
mhandlers = {}
helpers = []
for path, helper in sys.modules.items():
if not path.startswith('misp_modules.helpers.'):
continue
helpername = path.replace('misp_modules.helpers.', '')
mhandlers[helpername] = helper
selftest = mhandlers[helpername].selftest()
helper_name = path.replace('misp_modules.helpers.', '')
mhandlers[helper_name] = helper
selftest = mhandlers[helper_name].selftest()
if selftest is None:
helpers.append(helpername)
log.info('Helper loaded {}'.format(helpername))
helpers.append(helper_name)
log.info(f'Helper loaded {helper_name}')
else:
log.info('Helpers failed {} due to {}'.format(helpername, selftest))
log.warning(f'Helpers failed {helper_name} due to {selftest}')
return mhandlers, helpers
@ -128,51 +135,61 @@ def load_modules(mod_dir):
continue
if filename == '__init__.py':
continue
modulename = filename.split(".")[0]
moduletype = os.path.split(mod_dir)[1]
module_name = filename.split(".")[0]
module_type = os.path.split(mod_dir)[1]
try:
mhandlers[modulename] = importlib.import_module(os.path.basename(root) + '.' + modulename)
mhandlers[module_name] = importlib.import_module(os.path.basename(root) + '.' + module_name)
except Exception as e:
log.warning('MISP modules {0} failed due to {1}'.format(modulename, e))
log.warning(f'MISP modules {module_name} failed due to {e}')
continue
modules.append(modulename)
log.info('MISP modules {0} imported'.format(modulename))
mhandlers['type:' + modulename] = moduletype
modules.append(module_name)
log.info(f'MISP modules {module_name} imported')
mhandlers['type:' + module_name] = module_type
return mhandlers, modules
def load_package_modules():
if not HAS_PACKAGE_MODULES:
log.info('Unable to load MISP modules from package.')
sys.exit()
log.error('Unable to load MISP modules from package.')
sys.exit(1)
mhandlers = {}
modules = []
for path, module in sys.modules.items():
r = re.findall(r"misp_modules[.]modules[.](\w+)[.]([^_]\w+)", path)
if r and len(r[0]) == 2:
moduletype, modulename = r[0]
mhandlers[modulename] = module
modules.append(modulename)
log.info('MISP modules {0} imported'.format(modulename))
mhandlers['type:' + modulename] = moduletype
module_type, module_name = r[0]
mhandlers[module_name] = module
modules.append(module_name)
log.info(f'MISP modules {module_name} imported')
mhandlers['type:' + module_name] = module_type
return mhandlers, modules
class Healthcheck(tornado.web.RequestHandler):
def get(self):
self.write(b'{"status": true}')
class ListModules(tornado.web.RequestHandler):
global loaded_modules
global mhandlers
_cached_json = None
def get(self):
ret = []
for module in loaded_modules:
x = {}
x['name'] = module
x['type'] = mhandlers['type:' + module]
x['mispattributes'] = mhandlers[module].introspection()
x['meta'] = mhandlers[module].version()
ret.append(x)
if not self._cached_json:
ret = []
for module_name in loaded_modules:
ret.append({
'name': module_name,
'type': mhandlers['type:' + module_name],
'mispattributes': mhandlers[module_name].introspection(),
'meta': mhandlers[module_name].version()
})
self._cached_json = json.dumps(ret)
log.debug('MISP ListModules request')
self.write(json.dumps(ret))
self.write(self._cached_json)
class QueryModule(tornado.web.RequestHandler):
@ -183,28 +200,34 @@ class QueryModule(tornado.web.RequestHandler):
executor = ThreadPoolExecutor(nb_threads)
@run_on_executor
def run_request(self, module, jsonpayload):
log.debug('MISP QueryModule request {0}'.format(jsonpayload))
response = mhandlers[module].handler(q=jsonpayload)
return json.dumps(response)
def run_request(self, module_name, json_payload, dict_payload):
log.debug('MISP QueryModule %s request %s', module_name, json_payload)
module = mhandlers[module_name]
if getattr(module, "dict_handler", None):
# New method that avoids double JSON decoding, new modules should define dict_handler
response = module.dict_handler(request=dict_payload)
else:
response = module.handler(q=json_payload)
return json.dumps(response, default=pymisp_json_default)
@tornado.gen.coroutine
def post(self):
try:
jsonpayload = self.request.body.decode('utf-8')
dict_payload = json.loads(jsonpayload)
json_payload = self.request.body
dict_payload = json.loads(json_payload)
if dict_payload.get('timeout'):
timeout = datetime.timedelta(seconds=int(dict_payload.get('timeout')))
else:
timeout = datetime.timedelta(seconds=300)
response = yield tornado.gen.with_timeout(timeout, self.run_request(dict_payload['module'], jsonpayload))
future = self.run_request(dict_payload['module'], json_payload, dict_payload)
response = yield tornado.gen.with_timeout(timeout, future)
self.write(response)
except tornado.gen.TimeoutError:
log.warning('Timeout on {} '.format(dict_payload['module']))
log.warning('Timeout on {}'.format(dict_payload['module']))
self.write(json.dumps({'error': 'Timeout.'}))
except Exception:
self.write(json.dumps({'error': 'Something went wrong, look in the server logs for details'}))
log.exception('Something went wrong:')
log.exception('Something went wrong when processing query request')
finally:
self.finish()
@ -223,20 +246,20 @@ def main():
global loaded_modules
signal.signal(signal.SIGINT, handle_signal)
signal.signal(signal.SIGTERM, handle_signal)
argParser = argparse.ArgumentParser(description='misp-modules server', formatter_class=argparse.RawTextHelpFormatter)
argParser.add_argument('-t', default=False, action='store_true', help='Test mode')
argParser.add_argument('-s', default=False, action='store_true', help='Run a system install (package installed via pip)')
argParser.add_argument('-d', default=False, action='store_true', help='Enable debugging')
argParser.add_argument('-p', default=6666, help='misp-modules TCP port (default 6666)')
argParser.add_argument('-l', default='localhost', help='misp-modules listen address (default localhost)')
argParser.add_argument('-m', default=[], action='append', help='Register a custom module')
argParser.add_argument('--devel', default=False, action='store_true', help='''Start in development mode, enable debug, start only the module(s) listed in -m.\nExample: -m misp_modules.modules.expansion.bgpranking''')
args = argParser.parse_args()
port = args.p
listen = args.l
arg_parser = argparse.ArgumentParser(description='misp-modules server', formatter_class=argparse.RawTextHelpFormatter)
arg_parser.add_argument('-t', '--test', default=False, action='store_true', help='Test mode')
arg_parser.add_argument('-s', '--system', default=False, action='store_true', help='Run a system install (package installed via pip)')
arg_parser.add_argument('-d', '--debug', default=False, action='store_true', help='Enable debugging')
arg_parser.add_argument('-p', '--port', default=6666, help='misp-modules TCP port (default 6666)')
arg_parser.add_argument('-l', '--listen', default='localhost', help='misp-modules listen address (default localhost)')
arg_parser.add_argument('-m', default=[], action='append', help='Register a custom module')
arg_parser.add_argument('--devel', default=False, action='store_true', help='''Start in development mode, enable debug, start only the module(s) listed in -m.\nExample: -m misp_modules.modules.expansion.bgpranking''')
args = arg_parser.parse_args()
if args.devel:
log = init_logger(level=True)
log.info('Launch MISP modules server in developement mode. Enable debug, load a list of modules is -m is used.')
init_logger(debug=True)
log.info('Launch MISP modules server in development mode. Enable debug, load a list of modules is -m is used.')
if args.m:
mhandlers = {}
modules = []
@ -247,12 +270,12 @@ def main():
mhandlers[modulename] = importlib.import_module(module)
mhandlers['type:' + modulename] = moduletype
modules.append(modulename)
log.info('MISP modules {0} imported'.format(modulename))
log.info(f'MISP modules {modulename} imported')
else:
mhandlers, loaded_modules = _launch_from_current_dir()
else:
log = init_logger(level=args.d)
if args.s:
init_logger(debug=args.debug)
if args.system:
log.info('Launch MISP modules server from package.')
load_package_helpers()
mhandlers, loaded_modules = load_package_modules()
@ -263,11 +286,15 @@ def main():
mispmod = importlib.import_module(module)
mispmod.register(mhandlers, loaded_modules)
service = [(r'/modules', ListModules), (r'/query', QueryModule)]
service = [
(r'/modules', ListModules),
(r'/query', QueryModule),
(r'/healthcheck', Healthcheck),
]
application = tornado.web.Application(service)
try:
application.listen(port, address=listen)
application.listen(args.port, address=args.listen)
except Exception as e:
if e.errno == 98:
pids = psutil.pids()
@ -279,14 +306,17 @@ def main():
print("\nmisp-modules is still running as PID: {}\n".format(pid))
print("Please kill accordingly:")
print("sudo kill {}".format(pid))
sys.exit(-1)
return 1
print(e)
print("misp-modules might still be running.")
else:
log.exception(f"Could not listen on {args.listen}:{args.port}")
return 1
log.info('MISP modules server started on {0} port {1}'.format(listen, port))
if args.t:
log.info(f'MISP modules server started on {args.listen} port {args.port}')
if args.test:
log.info('MISP modules started in test-mode, quitting immediately.')
sys.exit()
return 0
try:
IOLoop.instance().start()
finally:

View File

@ -91,7 +91,7 @@ class DomainArtifact(Artifact):
attr = obj.add_attribute(
"domain", value=self.domain, to_ids=self.is_ioc, comment=classifications
)
if tag:
if tag and attr:
self.tag_artifact_attribute(attr)
for ip in self.ips:
@ -141,7 +141,7 @@ class EmailArtifact(Artifact):
attr = obj.add_attribute(
"from", value=self.sender, to_ids=self.is_ioc, comment=classifications
)
if tag:
if tag and attr:
self.tag_artifact_attribute(attr)
if self.subject:
@ -220,7 +220,7 @@ class FileArtifact(Artifact):
key, value=value, to_ids=self.is_ioc, comment=classifications
)
if tag:
if tag and attr:
self.tag_artifact_attribute(attr)
if self.mimetype:
@ -277,7 +277,7 @@ class IpArtifact(Artifact):
attr = obj.add_attribute(
"ip", value=self.ip, comment=classifications, to_ids=self.is_ioc
)
if tag:
if tag and attr:
self.tag_artifact_attribute(attr)
return obj
@ -320,7 +320,7 @@ class MutexArtifact(Artifact):
to_ids=False,
comment=classifications,
)
if tag:
if tag and attr:
self.tag_artifact_attribute(attr)
operations = None
@ -377,8 +377,10 @@ class ProcessArtifact(Artifact):
cmd_attr = obj.add_attribute("command-line", value=self.cmd_line)
if tag:
self.tag_artifact_attribute(name_attr)
self.tag_artifact_attribute(cmd_attr)
if name_attr:
self.tag_artifact_attribute(name_attr)
if cmd_attr:
self.tag_artifact_attribute(cmd_attr)
return obj
@ -418,7 +420,7 @@ class RegistryArtifact(Artifact):
attr = obj.add_attribute(
"key", value=self.key, to_ids=self.is_ioc, comment=operations
)
if tag:
if tag and attr:
self.tag_artifact_attribute(attr)
return obj
@ -464,7 +466,7 @@ class UrlArtifact(Artifact):
category="External analysis",
to_ids=False,
)
if tag:
if tag and attr:
self.tag_artifact_attribute(attr)
if self.domain:
@ -698,7 +700,7 @@ class Summary(ReportParser):
for process in processes:
classifications = process.get("classifications", [])
cmd_line = process.get("cmd_line")
name = process["image_name"]
name = process.get("image_name")
verdict = self.to_verdict(process.get("severity"))
is_ioc = process.get("ioc", False)
@ -731,7 +733,7 @@ class Summary(ReportParser):
artifact = UrlArtifact(
url=url["url"],
operations=url["operations"],
operations=url.get("operations", []),
ips=ips,
is_ioc=is_ioc,
verdict=verdict,
@ -871,7 +873,9 @@ class SummaryV2(ReportParser):
continue
for ip_address in self._resolve_refs(ref_ip_addresses):
artifact.ips.append(ip_address["ip_address"])
ip = ip_address.get("ip_address")
if ip is not None:
artifact.ips.append(ip)
yield artifact
@ -956,7 +960,7 @@ class SummaryV2(ReportParser):
artifact = ProcessArtifact(
pid=process["os_pid"],
parent_pid=process["origin_monitor_id"],
filename=process["filename"],
filename=process.get("filename"),
is_ioc=process["is_ioc"],
cmd_line=cmd_line,
classifications=classifications,
@ -978,17 +982,19 @@ class SummaryV2(ReportParser):
for url in self._resolve_refs(url_refs):
domain = None
ref_domain = url.get("ref_domain", {})
if ref_domain:
if ref_domain and self._resolve_ref(ref_domain).get("domain") is not None:
domain = self._resolve_ref(ref_domain)["domain"]
ips = []
ref_ip_addresses = url.get("ref_ip_addresses", [])
for ip_address in self._resolve_refs(ref_ip_addresses):
ips.append(ip_address["ip_address"])
ip = ip_address.get("ip_address")
if ip is not None:
ips.append(ip)
artifact = UrlArtifact(
url=url["url"],
operations=url["operations"],
operations=url.get("operations", []),
is_ioc=url["is_ioc"],
domain=domain,
ips=ips,

@ -1 +1 @@
Subproject commit 9dc7e3578f2165e32a3b7cdd09e9e552f2d98d36
Subproject commit 9c8b9504257c65459cfedf4f3231aee74f77dbe3

View File

@ -1,4 +1,5 @@
import json
from pyfaup.faup import Faup
from mattermostdriver import Driver
from ._utils import utils
@ -9,7 +10,7 @@ moduleconfig = {
'params': {
'mattermost_hostname': {
'type': 'string',
'description': 'The Mattermost domain',
'description': 'The Mattermost domain or URL',
'value': 'example.mattermost.com',
},
'bot_access_token': {
@ -44,15 +45,18 @@ moduleinfo = {'version': '0.1', 'author': 'Sami Mokaddem',
'description': 'Simplistic module to send message to a Mattermost channel.',
'module-type': ['action']}
f = Faup()
def createPost(request):
params = request['params']
f.decode(params['mattermost_hostname'])
parsedURL = f.get()
mm = Driver({
'url': params['mattermost_hostname'],
'url': parsedURL['host'],
'token': params['bot_access_token'],
'scheme': 'https',
'scheme': parsedURL['scheme'] if parsedURL['scheme'] is not None else 'https',
'basepath': '/api/v4',
'port': 443,
'port': int(parsedURL['port']) if parsedURL['port'] is not None else 443,
})
mm.login()

View File

@ -4,8 +4,8 @@ import sys
sys.path.append('{}/lib'.format('/'.join((os.path.realpath(__file__)).split('/')[:-3])))
__all__ = ['cuckoo_submit', 'vmray_submit', 'bgpranking', 'circl_passivedns', 'circl_passivessl',
'countrycode', 'cve', 'cve_advanced', 'cpe', 'dns', 'btc_steroids', 'domaintools', 'eupi',
'eql', 'farsight_passivedns', 'ipasn', 'passivetotal', 'sourcecache', 'virustotal',
'cluster25_expand', 'countrycode', 'cve', 'cve_advanced', 'cpe', 'dns', 'btc_steroids', 'domaintools',
'eupi', 'eql', 'farsight_passivedns', 'ipasn', 'passivetotal', 'sourcecache', 'virustotal',
'whois', 'shodan', 'reversedns', 'geoip_asn', 'geoip_city', 'geoip_country', 'wiki', 'iprep',
'threatminer', 'otx', 'threatcrowd', 'vulndb', 'crowdstrike_falcon',
'yara_syntax_validator', 'hashdd', 'onyphe', 'onyphe_full', 'rbl',
@ -20,7 +20,7 @@ __all__ = ['cuckoo_submit', 'vmray_submit', 'bgpranking', 'circl_passivedns', 'c
'trustar_enrich', 'recordedfuture', 'html_to_markdown', 'socialscan', 'passive-ssh',
'qintel_qsentry', 'mwdb', 'hashlookup', 'mmdb_lookup', 'ipqs_fraud_and_risk_scoring',
'clamav', 'jinja_template_rendering','hyasinsight', 'variotdbs', 'crowdsec',
'extract_url_components', 'ipinfo']
'extract_url_components', 'ipinfo', 'whoisfreaks', 'ip2locationio', 'vysion']
minimum_required_fields = ('type', 'uuid', 'value')

View File

@ -0,0 +1,136 @@
import requests
import json
from pymisp import MISPObject, MISPAttribute, MISPEvent
from . import check_input_attribute, checking_error, standard_error_message
import dns.resolver
misperrors = {'error': 'Error'}
mispattributes = {'input': ['ip-src', 'ip-dst', 'hostname', 'domain', 'domain|ip'], 'format': 'misp_standard'}
moduleinfo = {'version': '0.1', 'author': 'Stephanie S',
'description': 'AbuseIPDB MISP expansion module',
'module-type': ['expansion', 'hover']}
moduleconfig = ['api_key', 'max_age_in_days', 'abuse_threshold']
def get_ip(request):
# Need to get the ip from the domain
resolver = dns.resolver.Resolver()
resolver.timeout = 2
resolver.lifetime = 2
try:
ip = resolver.query(request["attribute"]["value"], 'A')
return ip
except dns.resolver.NXDOMAIN:
misperrors['error'] = "NXDOMAIN"
return misperrors
except dns.exception.Timeout:
misperrors['error'] = "Timeout"
return misperrors
except Exception:
misperrors['error'] = "DNS resolving error"
return misperrors
def handler(q=False):
if q is False:
return False
request = json.loads(q)
if "config" not in request or "api_key" not in request["config"]:
return {"error": "AbuseIPDB API key is missing"}
if "max_age_in_days" not in request["config"]:
return {"error": "AbuseIPDB max age in days is missing"}
if "abuse_threshold" not in request["config"]:
return {"error": "AbuseIPDB abuse threshold is missing"}
if not request.get('attribute') or not check_input_attribute(request['attribute'], requirements=('type', 'value')):
return {'error': f'{standard_error_message}, {checking_error}.'}
if request['attribute']['type'] not in mispattributes['input']:
return {'error': 'Unsupported attribute type.'}
if (request['attribute']['type'] == 'hostname' or request['attribute']['type'] == 'domain' or request['attribute']['type'] == 'domain|ip'):
ip = get_ip(request)[0]
else:
ip = request["attribute"]["value"]
api_key = request["config"]["api_key"]
max_age_in_days = request["config"]["max_age_in_days"]
api_endpoint = 'https://api.abuseipdb.com/api/v2/check'
querystring = {
'ipAddress': ip,
'maxAgeInDays': max_age_in_days
}
headers = {
'Accept': 'application/json',
'key': api_key
}
r = {"results": []}
response = requests.request(method='GET', url=api_endpoint, headers=headers, params=querystring)
if (response.status_code == 200):
response_json = json.loads(response.text)
is_whitelisted = response_json['data']['isWhitelisted']
is_tor = response_json['data']['isTor']
is_public = response_json['data']['isPublic']
abuse_confidence_score = response_json['data']['abuseConfidenceScore']
abuse_threshold = request["config"]["abuse_threshold"]
if (request["config"]["abuse_threshold"] is not None):
abuse_threshold = request["config"]["abuse_threshold"]
else:
abuse_threshold = 70
if (is_whitelisted == False):
is_whitelisted = 0
if (is_tor == False):
is_tor = 0
if (is_public == False):
is_public = 0
if (abuse_confidence_score == None):
abuse_confidence_score = 0
if (response_json.get("errors")):
return {'error': 'AbuseIPDB error, check logs'}
else:
event = MISPEvent()
obj = MISPObject('abuseipdb')
event.add_attribute(**request['attribute'])
if int(abuse_confidence_score) >= int(abuse_threshold):
malicious_attribute = obj.add_attribute('is-malicious', **{'type': 'boolean', 'value': 1})
malicious_attribute.add_tag(f'ioc:artifact-state="malicious"')
else:
malicious_attribute = obj.add_attribute('is-malicious', **{'type': 'boolean', 'value': 0})
malicious_attribute.add_tag(f'ioc:artifact-state="not-malicious"')
if is_whitelisted is not None:
obj.add_attribute('is-whitelisted', **{'type': 'boolean', 'value': is_whitelisted})
obj.add_attribute('is-tor', **{'type': 'boolean', 'value': is_tor})
obj.add_attribute('is-public', **{'type': 'boolean', 'value': is_public})
obj.add_attribute('abuse-confidence-score', **{'type': 'counter', 'value': abuse_confidence_score})
obj.add_reference(request['attribute']['uuid'], "describes")
event.add_object(obj)
# Avoid serialization issue
event = json.loads(event.to_json())
r['results'] = {'Object': event['Object'], 'Attribute': event['Attribute']}
return r
else:
try:
response_json = json.loads(response.text)
if (response_json['errors']):
return {"error": "API not reachable, status code: " + str(response.status_code) + " " + str(response_json['errors'][0]['detail'])}
except:
pass
return {"error": "API not reachable, status code: " + str(response.status_code)}
def introspection():
return mispattributes
def version():
moduleinfo['config'] = moduleconfig
return moduleinfo

View File

@ -16,14 +16,14 @@ misperrors = {'error': 'Error'}
mispattributes = {'input': ["domain", "domain|ip", "hostname", "ip-dst", "ip-src", "ip-dst|port", "ip-src|port", "url",
"md5", "sha1", "sha256", "filename|md5", "filename|sha1", "filename|sha256"],
'output': ["domain", "ip-dst", "url", "comment", "md5", "sha1", "sha256"]
'output': ["domain", "ip-dst", "url", "comment", "md5", "sha1", "sha256", "link", "text"]
}
moduleinfo = {'version': '0.1', 'author': 'Davide Baglieri aka davidonzo',
moduleinfo = {'version': '0.2', 'author': 'Davide Baglieri aka davidonzo',
'description': 'On demand query API for OSINT.digitalside.it project.',
'module-type': ['expansion', 'hover']}
moduleconfig = ['import_related_hashes', 'cache', 'cache_directory']
moduleconfig = ['STIX2_details', 'import_related', 'cache', 'cache_directory', 'cache_timeout_h', 'local_directory']
def handler(q=False):
@ -62,18 +62,49 @@ def handler(q=False):
tosubmit.append(request['filename|sha256'].split('|')[1])
else:
return False
persistent = 0
if request.get('persistent'):
persistent = request["persistent"]
submitcache = False
submitcache_directory = False
import_related_hashes = False
submitcache_timeout = False
submit_stix = False
import_related = False
sumbit_localdirectory = False
r = {"results": []}
if request.get('config'):
if request['config'].get('cache') and request['config']['cache'].lower() == "yes":
submitcache = True
if request['config'].get('import_related_hashes') and request['config']['import_related_hashes'].lower() == "yes":
import_related_hashes = True
if request['config'].get('import_related') and request['config']['import_related'].lower() == "yes":
import_related = True
if request['config'].get('STIX2_details') and request['config']['STIX2_details'].lower() == "yes":
submit_stix = True
if request['config'].get('cache_timeout_h') and len(request['config']['cache_timeout_h']) > 0:
submitcache_timeout = int(request['config'].get('cache_timeout_h'))
localdirectory = request['config'].get('local_directory')
if localdirectory and len(localdirectory) > 0:
if os.access(localdirectory, os.R_OK):
sumbit_localdirectory = localdirectory
WarningMSG = "Local directory OK! Ignoring cache configuration..."
log.debug(str(WarningMSG))
submitcache = False
sumbitcache_titmeout = False
submitcache_directory = False
else:
ErrorMSG = "Unable to read local 'Threat-Intel' directory ("+localdirectory+"). Please, check your configuration and retry."
log.debug(str(ErrorMSG))
misperrors['error'] = ErrorMSG
return misperrors
if submitcache:
cache_directory = request['config'].get('cache_directory')
if cache_directory and len(cache_directory) > 0:
@ -90,52 +121,229 @@ def handler(q=False):
misperrors['error'] = ErrorMSG
return misperrors
else:
log.debug("Cache option is set to " + str(submitcache) + ". You are not using the internal cache system and this is NOT recommended!")
log.debug("Please, consider to turn on the cache setting it to 'Yes' and specifing a writable directory for the cache directory option.")
if sumbit_localdirectory == False:
log.debug("Cache option is set to " + str(submitcache) + ". You are not using the internal cache system and this is NOT recommended!")
log.debug("Please, consider to turn on the cache setting it to 'Yes' and specifing a writable directory for the cache directory option.")
try:
response = apiosintDS.request(entities=tosubmit, cache=submitcache, cachedirectory=submitcache_directory, verbose=True)
r["results"] += reversed(apiosintParser(response, import_related_hashes))
response = apiosintDS.request(entities=tosubmit, stix=submit_stix, cache=submitcache, cachedirectory=submitcache_directory, cachetimeout=submitcache_timeout, verbose=True, localdirectory=sumbit_localdirectory)
r["results"] += apiosintParserHover(persistent, response, import_related, submit_stix)
return r
except Exception as e:
log.debug(str(e))
misperrors['error'] = str(e)
return r
log.exception("Could not process apiosintDS")
return {'error': str(e)}
def apiosintParser(response, import_related_hashes):
def apiosintParserHover(ispersistent, response, import_related, stix):
apiosinttype = ['hash', 'ip', 'url', 'domain']
line = "##############################################"
linedot = "--------------------------------------------------------------------"
linedotty = "-------------------"
ret = []
retHover = []
if isinstance(response, dict):
for key in response:
for item in response[key]["items"]:
if item["response"]:
comment = item["item"] + " IS listed by OSINT.digitalside.it. Date list: " + response[key]["list"]["date"]
if key == "url":
if "hashes" in item.keys():
if "sha256" in item["hashes"].keys():
ret.append({"types": ["sha256"], "values": [item["hashes"]["sha256"]]})
if "sha1" in item["hashes"].keys():
ret.append({"types": ["sha1"], "values": [item["hashes"]["sha1"]]})
if "md5" in item["hashes"].keys():
ret.append({"types": ["md5"], "values": [item["hashes"]["md5"]]})
if key in apiosinttype:
for item in response[key]["items"]:
if item["response"]:
comment = "IoC '"+item["item"] + "' found in OSINT.DigitaiSide.it repository. List file: "+response[key]["list"]["file"]+". List date: " + response[key]["list"]["date"]
commentH = "IoC '"+item["item"] + "' found in OSINT.DigitaiSide.it repository."
CommentHDate = "List file: "+response[key]["list"]["file"]+". Date list: " + response[key]["list"]["date"]
ret.append({"types": ["text"], "values": [comment]})
retHover.append({"types": ["text"], "values": [commentH]})
retHover.append({"types": ["text"], "values": [CommentHDate]})
retHover.append({"types": ["text"], "values": [line]})
if key in ["url", "hash"]:
if "hashes" in item:
headhash = "Hashes set"
retHover.append({"types": ["text"], "values": [headhash]})
if "md5" in item["hashes"].keys():
ret.append({"types": ["md5"], "values": [item["hashes"]["md5"]], "comment": "Related to: " + item["item"]})
strmd5 = "MD5: "+item["hashes"]["md5"]
retHover.append({"types": ["text"], "values": [strmd5]})
if "sha1" in item["hashes"].keys():
ret.append({"types": ["sha1"], "values": [item["hashes"]["sha1"]], "comment": "Related to: " + item["item"]})
strsha1 = "SHA1: "+item["hashes"]["sha1"]
retHover.append({"types": ["text"], "values": [strsha1]})
if "sha256" in item["hashes"].keys():
ret.append({"types": ["sha256"], "values": [item["hashes"]["sha256"]], "comment": "Related to: " + item["item"]})
strsha256 = "SHA256: "+item["hashes"]["sha256"]
retHover.append({"types": ["text"], "values": [strsha256]})
if "online_reports" in item:
headReports = "Online Reports (availability depends on retention)"
retHover.append({"types": ["text"], "values": [linedot]})
retHover.append({"types": ["text"], "values": [headReports]})
onlierepor = item["online_reports"]
ret.append({"category": "External analysis", "types": ["link"], "values": [onlierepor["MISP_EVENT"]], "comment": "MISP Event related to: " + item["item"]})
ret.append({"category": "External analysis", "types": ["link"], "values": [onlierepor["MISP_CSV"]], "comment": "MISP CSV related to: " + item["item"]})
ret.append({"category": "External analysis", "types": ["link"], "values": [onlierepor["OSINTDS_REPORT"]], "comment": "DigitalSide report related to: " + item["item"]})
ret.append({"category": "External analysis", "types": ["link"], "values": [onlierepor["STIX"]], "comment": "STIX2 report related to: " + item["item"]})
MISPEVENT = "MISP Event => "+onlierepor["MISP_EVENT"]
MISPCSV = "MISP CSV => "+onlierepor["MISP_CSV"]
OSINTDS = "DigitalSide report => "+onlierepor["OSINTDS_REPORT"]
STIX = "STIX report => "+onlierepor["STIX"]
retHover.append({"types": ["text"], "values": [MISPEVENT]})
retHover.append({"types": ["text"], "values": [MISPCSV]})
retHover.append({"types": ["text"], "values": [OSINTDS]})
retHover.append({"types": ["text"], "values": [STIX]})
if stix and onlierepor:
if "STIXDETAILS" in onlierepor:
retHover.append({"types": ["text"], "values": [linedot]})
headStix = "STIX2 report details"
stixobj = onlierepor["STIXDETAILS"]
stxdet = "TLP:"+stixobj["tlp"]+" | Observation: "+str(stixobj["number_observed"])+" | First seen: "+stixobj["first_observed"]+" | First seen: "+stixobj["last_observed"]
ret.append({"types": ["comment"], "values": [stxdet], "comment": "STIX2 details for: " + item["item"]})
retHover.append({"types": ["text"], "values": [headStix]})
retHover.append({"types": ["text"], "values": [stxdet]})
if stixobj["observed_time_frame"] != False:
obstf = "Observation time frame: "+str(stixobj["observed_time_frame"])
ret.append({"types": ["comment"], "values": [obstf], "comment": "STIX2 details for: " + item["item"]})
retHover.append({"types": ["text"], "values": [obstf]})
filename = stixobj["filename"]
ret.append({"category": "Payload delivery", "types": ["filename"], "values": [filename], "comment": "STIX2 details for: " + item["item"]})
Hovefilename = "Filename: "+filename
retHover.append({"types": ["text"], "values": [Hovefilename]})
filesize = stixobj["filesize"]
ret.append({"types": ["size-in-bytes"], "values": [filesize], "comment": "STIX2 details for: " + item["item"]})
Hovefilesize = "Filesize in bytes: "+str(filesize)
retHover.append({"types": ["text"], "values": [Hovefilesize]})
filetype = stixobj["mime_type"]
ret.append({"category": "Payload delivery", "types": ["mime-type"], "values": [filetype], "comment": "STIX2 details for: " + item["item"]})
Hovemime = "Filetype: "+filetype
retHover.append({"types": ["text"], "values": [Hovemime]})
if "virus_total" in stixobj:
if stixobj["virus_total"] != False:
VTratio = "VirusTotal Ratio: "+str(stixobj["virus_total"]["vt_detection_ratio"])
ret.append({"types": ["comment"], "values": [VTratio], "comment": "STIX2 details for: " + item["item"]})
retHover.append({"types": ["text"], "values": [VTratio]})
if len(item["related_urls"]) > 0:
for urls in item["related_urls"]:
if isinstance(urls, dict):
itemToInclude = urls["url"]
if import_related_hashes:
if "hashes" in urls.keys():
if "sha256" in urls["hashes"].keys():
ret.append({"types": ["sha256"], "values": [urls["hashes"]["sha256"]], "comment": "Related to: " + itemToInclude})
if "sha1" in urls["hashes"].keys():
ret.append({"types": ["sha1"], "values": [urls["hashes"]["sha1"]], "comment": "Related to: " + itemToInclude})
if "md5" in urls["hashes"].keys():
ret.append({"types": ["md5"], "values": [urls["hashes"]["md5"]], "comment": "Related to: " + itemToInclude})
ret.append({"types": ["url"], "values": [itemToInclude], "comment": "Related to: " + item["item"]})
else:
ret.append({"types": ["url"], "values": [urls], "comment": "Related URL to: " + item["item"]})
else:
comment = item["item"] + " IS NOT listed by OSINT.digitalside.it. Date list: " + response[key]["list"]["date"]
ret.append({"types": ["text"], "values": [comment]})
return ret
VTReport = str(stixobj["virus_total"]["vt_report"])
ret.append({"category": "External analysis", "types": ["link"], "values": [VTReport], "comment": "VirusTotal Report for: " + item["item"]})
if import_related:
if len(item["related_urls"]) > 0:
retHover.append({"types": ["text"], "values": [linedot]})
countRelated = "Related URLS count: "+str(len(item["related_urls"]))
retHover.append({"types": ["text"], "values": [countRelated]})
for urls in item["related_urls"]:
if isinstance(urls, dict):
itemToInclude = urls["url"]
ret.append({"types": ["url"], "values": [itemToInclude], "comment": "Download URL for "+urls["hashes"]["md5"]+". Related to: " + item["item"]})
retHover.append({"types": ["text"], "values": [linedot]})
relatedURL = "Related URL "+itemToInclude
retHover.append({"types": ["text"], "values": [relatedURL]})
if "hashes" in urls.keys():
if "md5" in urls["hashes"].keys():
ret.append({"types": ["md5"], "values": [urls["hashes"]["md5"]], "comment": "Related to: " + itemToInclude})
strmd5 = "MD5: "+urls["hashes"]["md5"]
retHover.append({"types": ["text"], "values": [strmd5]})
if "sha1" in urls["hashes"].keys():
ret.append({"types": ["sha1"], "values": [urls["hashes"]["sha1"]], "comment": "Related to: " + itemToInclude})
strsha1 = "SHA1: "+urls["hashes"]["sha1"]
retHover.append({"types": ["text"], "values": [strsha1]})
if "sha256" in urls["hashes"].keys():
ret.append({"types": ["sha256"], "values": [urls["hashes"]["sha256"]], "comment": "Related to: " + itemToInclude})
strsha256 = "SHA256: "+urls["hashes"]["sha256"]
retHover.append({"types": ["text"], "values": [strsha256]})
headReports = "Online Reports (availability depends on retention)"
retHover.append({"types": ["text"], "values": [linedotty]})
retHover.append({"types": ["text"], "values": [headReports]})
onlierepor = urls["online_reports"]
ret.append({"category": "External analysis", "types": ["link"], "values": [onlierepor["MISP_EVENT"]], "comment": "MISP Event related to: " + item["item"]})
ret.append({"category": "External analysis", "types": ["link"], "values": [onlierepor["MISP_CSV"]], "comment": "MISP CSV related to: " + item["item"]})
ret.append({"category": "External analysis", "types": ["link"], "values": [onlierepor["OSINTDS_REPORT"]], "comment": "DigitalSide report related to: " + item["item"]})
ret.append({"category": "External analysis", "types": ["link"], "values": [onlierepor["STIX"]], "comment": "STIX2 report related to: " + item["item"]})
MISPEVENT = "MISP Event => "+onlierepor["MISP_EVENT"]
MISPCSV = "MISP CSV => "+onlierepor["MISP_CSV"]
OSINTDS = "DigitalSide report => "+onlierepor["OSINTDS_REPORT"]
STIX = "STIX report => "+onlierepor["STIX"]
retHover.append({"types": ["text"], "values": [MISPEVENT]})
retHover.append({"types": ["text"], "values": [MISPCSV]})
retHover.append({"types": ["text"], "values": [OSINTDS]})
retHover.append({"types": ["text"], "values": [STIX]})
if stix and onlierepor:
if "STIXDETAILS" in onlierepor:
retHover.append({"types": ["text"], "values": [linedotty]})
headStix = "STIX2 report details"
stixobj = onlierepor["STIXDETAILS"]
stxdet = "TLP:"+stixobj["tlp"]+" | Observation: "+str(stixobj["number_observed"])+" | First seen: "+stixobj["first_observed"]+" | First seen: "+stixobj["last_observed"]
ret.append({"types": ["comment"], "values": [stxdet], "comment": "STIX2 details for: " + item["item"]})
retHover.append({"types": ["text"], "values": [headStix]})
retHover.append({"types": ["text"], "values": [stxdet]})
if stixobj["observed_time_frame"] != False:
obstf = "Observation time frame: "+str(stixobj["observed_time_frame"])
ret.append({"types": ["comment"], "values": [obstf], "comment": "STIX2 details for: " + item["item"]})
retHover.append({"types": ["text"], "values": [obstf]})
filename = stixobj["filename"]
ret.append({"category": "Payload delivery", "types": ["filename"], "values": [filename], "comment": "STIX2 details for: " + item["item"]})
Hovefilename = "Filename: "+filename
retHover.append({"types": ["text"], "values": [Hovefilename]})
filesize = stixobj["filesize"]
ret.append({"types": ["size-in-bytes"], "values": [filesize], "comment": "STIX2 details for: " + item["item"]})
Hovefilesize = "Filesize in bytes: "+str(filesize)
retHover.append({"types": ["text"], "values": [Hovefilesize]})
filetype = stixobj["mime_type"]
ret.append({"category": "Payload delivery", "types": ["mime-type"], "values": [filetype], "comment": "STIX2 details for: " + item["item"]})
Hovemime = "Filetype: "+filetype
retHover.append({"types": ["text"], "values": [Hovemime]})
if "virus_total" in stixobj:
if stixobj["virus_total"] != False:
VTratio = "VirusTotal Ratio: "+stixobj["virus_total"]["vt_detection_ratio"]
ret.append({"types": ["comment"], "values": [VTratio], "comment": "STIX2 details for: " + item["item"]})
retHover.append({"types": ["text"], "values": [VTratio]})
VTReport = stixobj["virus_total"]["vt_report"]
ret.append({"category": "External analysis", "types": ["link"], "values": [VTReport], "comment": "VirusTotal Report for: " + item["item"]})
else:
ret.append({"types": ["url"], "values": [urls], "comment": "Download URL for: " + item["item"]})
urlHover = "URL => "+urls
retHover.append({"types": ["text"], "values": [urlHover]})
else:
notfound = item["item"] + " IS NOT listed by OSINT.digitalside.it. Date list: " + response[key]["list"]["date"]
ret.append({"types": ["comment"], "values": [notfound]})
retHover.append({"types": ["comment"], "values": [notfound]})
if ispersistent == 0:
return ret
return retHover
def introspection():

View File

@ -27,7 +27,7 @@ def handler(q=False):
btc = request['btc']
query = f"{btc}.{url}"
try:
result = ' - '.join([str(r) for r in resolver.query(query, 'TXT')])[1:-1]
result = ' - '.join([str(r) for r in resolver.resolve(query, 'TXT')])[1:-1]
except NXDOMAIN:
result = f"{btc} is not known as a scam address."
except LabelTooLong:

View File

@ -1,4 +1,3 @@
import json
import pypdns
from . import check_input_attribute, standard_error_message
from pymisp import MISPAttribute, MISPEvent, MISPObject
@ -10,7 +9,7 @@ moduleinfo = {'version': '0.2', 'author': 'Alexandre Dulaunoy',
moduleconfig = ['username', 'password']
class PassiveDNSParser():
class PassiveDNSParser:
def __init__(self, attribute, authentication):
self.misp_event = MISPEvent()
self.attribute = MISPAttribute()
@ -21,7 +20,7 @@ class PassiveDNSParser():
def get_results(self):
if hasattr(self, 'result'):
return self.result
event = json.loads(self.misp_event.to_json())
event = self.misp_event.to_dict()
results = {key: event[key] for key in ('Attribute', 'Object')}
return {'results': results}
@ -50,10 +49,7 @@ class PassiveDNSParser():
self.misp_event.add_object(**pdns_object)
def handler(q=False):
if q is False:
return False
request = json.loads(q)
def dict_handler(request: dict):
if not request.get('config'):
return {'error': 'CIRCL Passive DNS authentication is missing.'}
if not request['config'].get('username') or not request['config'].get('password'):

View File

@ -1,6 +1,5 @@
import base64
import io
import json
import logging
import sys
import zipfile
@ -43,7 +42,7 @@ def create_response(original_attribute: dict, software: str, signature: Optional
av_signature_object.add_reference(original_attribute["uuid"], "belongs-to")
misp_event.add_object(av_signature_object)
event = json.loads(misp_event.to_json())
event = misp_event.to_dict()
results = {key: event[key] for key in ('Attribute', 'Object') if (key in event and event[key])}
return {"results": results}
@ -58,12 +57,7 @@ def connect_to_clamav(connection_string: str) -> clamd.ClamdNetworkSocket:
raise Exception("ClamAV connection string is invalid. It must be unix socket path with 'unix://' prefix or IP:PORT.")
def handler(q=False):
if q is False:
return False
request = json.loads(q)
def dict_handler(request: dict):
connection_string: str = request["config"].get("connection")
if not connection_string:
return {"error": "No ClamAV connection string provided"}

View File

@ -0,0 +1,230 @@
import json
import requests
import uuid
from . import check_input_attribute, standard_error_message
from pymisp import MISPAttribute, MISPEvent, MISPObject
moduleinfo = {'version': '0.1',
'author': 'Milo Volpicelli',
'description': 'Module to query Cluster25CTI',
'module-type': ['expansion', 'hover']}
moduleconfig = ['api_id', 'apikey', 'base_url']
misperrors = {'error': 'Error'}
misp_type_in = ['domain', 'email-src', 'email-dst', 'filename', 'md5', 'sha1', 'sha256', 'ip-src', 'ip-dst', 'url',
'vulnerability', 'btc', 'xmr', 'ja3-fingerprint-md5']
mapping_out = { # mapping between the MISP attributes type and the compatible Cluster25 indicator types.
'domain': {'type': 'domain', 'to_ids': True},
'email-src': {'type': 'email-src', 'to_ids': True},
'email-dst': {'type': 'email-dst', 'to_ids': True},
'filename': {'type': 'filename', 'to_ids': True},
'md5': {'type': 'md5', 'to_ids': True},
'sha1': {'type': 'sha1', 'to_ids': True},
'sha256': {'type': 'sha256', 'to_ids': True},
'ip-src': {'type': 'ip-src', 'to_ids': True},
'ip-dst': {'type': 'ip-dst', 'to_ids': True},
'url': {'type': 'url', 'to_ids': True},
'cve': {'type': 'vulnerability', 'to_ids': True},
'btcaddress': {'type': 'btc', 'to_ids': True},
'xmraddress': {'type': 'xmr', 'to_ids': True},
'ja3': {'type': 'ja3-fingerprint-md5', 'to_ids': True},
}
misp_type_out = [item['type'] for item in mapping_out.values()]
misp_attributes = {'input': misp_type_in, 'format': 'misp_standard'}
def handler(q=False):
if q is False:
return False
request = json.loads(q)
# validate Cluster25 params
if request.get('config'):
if request['config'].get('apikey') is None:
misperrors['error'] = 'Cluster25 apikey is missing'
return misperrors
if request['config'].get('api_id') is None:
misperrors['error'] = 'Cluster25 api_id is missing'
return misperrors
if request['config'].get('base_url') is None:
misperrors['error'] = 'Cluster25 base_url is missing'
return misperrors
# validate attribute
if not request.get('attribute') or not check_input_attribute(request['attribute']):
return {'error': f'{standard_error_message}, which should contain at least a type, a value and an uuid.'}
attribute = request.get('attribute')
if not any(input_type == attribute.get('type') for input_type in misp_type_in):
return {'error': 'Unsupported attribute type.'}
client = Cluster25CTI(request['config']['api_id'], request['config']['apikey'], request['config']['base_url'])
return lookup_indicator(client, request.get('attribute'))
def format_content(content):
if isinstance(content, str) or isinstance(content, bool) or isinstance(content, int) or isinstance(content, float):
return content
ret = ""
tmp_ret = []
if content is None:
return ret
is_dict = isinstance(content, dict)
is_list = isinstance(content, list)
for index, key in enumerate(content):
if is_dict:
if isinstance(content[key], dict):
ret = format_content(content[key])
elif isinstance(content[key], list):
for list_item in content[key]:
tmp_ret.append(format_content(list_item))
else:
tmp_ret.append(f"{key}: {content[key]}")
elif is_list:
if isinstance(content[index], str):
ret = ", ".join(content)
else:
ret = format_content(content)
if tmp_ret:
ret = " ".join(tmp_ret)
return ret
def lookup_indicator(client, attr):
result = client.investigate(attr)
if result.get('error'):
return result
misp_event = MISPEvent()
attribute = MISPAttribute()
attribute.from_dict(**attr)
misp_event.add_attribute(**attribute)
misp_object_g = MISPObject('c25_generic_info')
misp_object_g.template_uuid = uuid.uuid4()
misp_object_g.description = 'c25_generic_info'
setattr(misp_object_g, 'meta-category', 'network')
misp_objects = []
for ind, entry in enumerate(result):
if isinstance(result[entry], dict):
tmp_obj = MISPObject(f"c25_{entry}")
tmp_obj.template_uuid = uuid.uuid4()
tmp_obj.description = f"c25_{entry}"
setattr(tmp_obj, 'meta-category', 'network')
tmp_obj.add_reference(attribute['uuid'], 'related-to')
for key in result[entry]:
if isinstance(result[entry][key], dict):
for index, item in enumerate(result[entry][key]):
if result[entry][key][item]:
tmp_obj.add_attribute(f"{entry}_{key}_{item}", **{'type': 'text', 'value': format_content(
result[entry][key][item])})
elif isinstance(result[entry][key], list):
for index, item in enumerate(result[entry][key]):
if isinstance(item, dict):
tmp_obj_2 = MISPObject(f"c25_{entry}_{key}_{index + 1}")
tmp_obj_2.template_uuid = uuid.uuid4()
tmp_obj_2.description = f"c25_{entry}_{key}"
setattr(tmp_obj_2, 'meta-category', 'network')
tmp_obj_2.add_reference(attribute['uuid'], 'related-to')
for sub_key in item:
if isinstance(item[sub_key], list):
for sub_item in item[sub_key]:
if isinstance(sub_item, dict):
tmp_obj_3 = MISPObject(f"c25_{entry}_{sub_key}_{index + 1}")
tmp_obj_3.template_uuid = uuid.uuid4()
tmp_obj_3.description = f"c25_{entry}_{sub_key}"
setattr(tmp_obj_3, 'meta-category', 'network')
tmp_obj_3.add_reference(attribute['uuid'], 'related-to')
for sub_sub_key in sub_item:
if isinstance(sub_item[sub_sub_key], list):
for idx, sub_sub_item in enumerate(sub_item[sub_sub_key]):
if sub_sub_item.get("name"):
sub_sub_item = sub_sub_item.get("name")
tmp_obj_3.add_attribute(f"{sub_sub_key}_{idx + 1}",
**{'type': 'text',
'value': format_content(
sub_sub_item)})
else:
tmp_obj_3.add_attribute(sub_sub_key,
**{'type': 'text',
'value': format_content(
sub_item[sub_sub_key])})
misp_objects.append(tmp_obj_3)
else:
tmp_obj_2.add_attribute(sub_key, **{'type': 'text',
'value': format_content(sub_item)})
elif item[sub_key]:
tmp_obj_2.add_attribute(sub_key,
**{'type': 'text', 'value': format_content(item[sub_key])})
misp_objects.append(tmp_obj_2)
elif item is not None:
tmp_obj.add_attribute(f"{entry}_{key}", **{'type': 'text', 'value': format_content(item)})
elif result[entry][key] is not None:
tmp_obj.add_attribute(key, **{'type': 'text', 'value': result[entry][key]})
if tmp_obj.attributes:
misp_objects.append(tmp_obj)
elif isinstance(result[entry], list):
for index, key in enumerate(result[entry]):
if isinstance(key, dict):
tmp_obj = MISPObject(f"c25_{entry}_{index + 1}")
tmp_obj.template_uuid = uuid.uuid4()
tmp_obj.description = f"c25_{entry}_{index + 1}"
setattr(tmp_obj, 'meta-category', 'network')
tmp_obj.add_reference(attribute['uuid'], 'related-to')
for item in key:
if key[item]:
tmp_obj.add_attribute(item, **{'type': 'text', 'value': format_content(key[item])})
tmp_obj.add_reference(attribute['uuid'], 'related-to')
misp_objects.append(tmp_obj)
elif key is not None:
misp_object_g.add_attribute(f"{entry}_{index + 1}",
**{'type': 'text', 'value': format_content(key)})
else:
if result[entry]:
misp_object_g.add_attribute(entry, **{'type': 'text', 'value': result[entry]})
misp_object_g.add_reference(attribute['uuid'], 'related-to')
misp_event.add_object(misp_object_g)
for misp_object in misp_objects:
misp_event.add_object(misp_object)
event = json.loads(misp_event.to_json())
results = {key: event[key] for key in ('Attribute', 'Object')}
return {'results': results}
def introspection():
return misp_attributes
def version():
moduleinfo['config'] = moduleconfig
return moduleinfo
class Cluster25CTI:
def __init__(self, customer_id=None, customer_key=None, base_url=None):
self.client_id = customer_id
self.client_secret = customer_key
self.base_url = base_url
self.current_token = self._get_cluster25_token()
self.headers = {"Authorization": f"Bearer {self.current_token}"}
def _get_cluster25_token(self):
payload = {"client_id": self.client_id, "client_secret": self.client_secret}
r = requests.post(url=f"{self.base_url}/token", json=payload, headers={"Content-Type": "application/json"})
if r.status_code != 200:
return {'error': f"Unable to retrieve the token from C25 platform, status {r.status_code}"}
return r.json()["data"]["token"]
def investigate(self, indicator) -> dict:
params = {'indicator': indicator.get('value')}
r = requests.get(url=f"{self.base_url}/investigate", params=params, headers=self.headers)
if r.status_code != 200:
return {'error': f"Unable to retrieve investigate result for indicator '{indicator.get('value')}' "
f"from C25 platform, status {r.status_code}"}
return r.json()["data"]

View File

@ -50,7 +50,7 @@ def handler(q=False):
return misperrors
query = "{}.{}".format(requested_value, dbl)
try:
query_result = resolver.query(query, 'A')[0]
query_result = resolver.resolve(query, 'A')[0]
result = "{} - {}".format(requested_value, dbl_mapping[str(query_result)])
except dns.resolver.NXDOMAIN as e:
result = e.msg

View File

@ -36,7 +36,7 @@ def handler(q=False):
r.nameservers = ['8.8.8.8']
try:
answer = r.query(toquery, 'A')
answer = r.resolve(toquery, 'A')
except dns.resolver.NXDOMAIN:
misperrors['error'] = "NXDOMAIN"
return misperrors

View File

@ -22,9 +22,12 @@ def createObjectFromURL(url):
if parsed['subdomain'] is not None:
obj.add_attribute('subdomain', type='text', value=parsed['subdomain'])
obj.add_attribute('scheme', type='text', value=parsed['scheme'])
obj.add_attribute('resource_path', type='text', value=parsed['resource_path'])
obj.add_attribute('query_string', type='text', value=parsed['query_string'])
obj.add_attribute('port', type='port', value=parsed['port'])
if parsed['resource_path'] is not None:
obj.add_attribute('resource_path', type='text', value=parsed['resource_path'])
if parsed['query_string'] is not None:
obj.add_attribute('query_string', type='text', value=parsed['query_string'])
if parsed['port'] is not None:
obj.add_attribute('port', type='port', value=parsed['port'])
obj.add_attribute('host', type='hostname', value=parsed['host'])
if parsed['fragment'] is not None:
obj.add_attribute('fragment', type='text', value=parsed['fragment'])

View File

@ -0,0 +1,76 @@
# import requests
import json
from pymisp import MISPObject, MISPAttribute, MISPEvent
from . import check_input_attribute, checking_error, standard_error_message
from pysafebrowsing import SafeBrowsing
misperrors = {'error': 'Error'}
mispattributes = {'input': ['url'], 'format': 'misp_standard'}
moduleinfo = {'version': '0.1', 'author': 'Stephanie S',
'description': 'Google safe browsing expansion module',
'module-type': ['expansion', 'hover']}
moduleconfig = ['api_key']
def handler(q=False):
if q is False:
return False
request = json.loads(q)
if "config" not in request or "api_key" not in request["config"]:
return {"error": "Google Safe Browsing API key is missing"}
if not request.get('attribute') or not check_input_attribute(request['attribute'], requirements=('type', 'value')):
return {'error': f'{standard_error_message}, {checking_error}.'}
if request['attribute']['type'] not in mispattributes['input']:
return {'error': 'Unsupported attribute type.'}
api_key = request["config"]["api_key"]
url = request["attribute"]["value"]
s = SafeBrowsing(api_key)
try:
response = s.lookup_urls([url])
event = MISPEvent()
obj = MISPObject('google-safe-browsing')
event.add_attribute(**request['attribute'])
if (response[url]['malicious'] != False):
# gsb threat types: THREAT_TYPE_UNSPECIFIED, MALWARE, SOCIAL_ENGINEERING, UNWANTED_SOFTWARE, POTENTIALLY_HARMFUL_APPLICATION
gsb_circl_threat_taxonomy = {"MALWARE": 'malware', "SOCIAL_ENGINEERING": 'social-engineering'}
threats = response[url]['threats']
malicious = response[url]['malicious']
platforms = response[url]['platforms']
malicious_attribute = obj.add_attribute('malicious', **{'type': 'boolean', 'value': malicious})
malicious_attribute.add_tag(f'ioc:artifact-state="malicious"')
threat_attribute = obj.add_attribute('threats', **{'type': 'text', 'value': str(" ".join(threats))})
for threat in threats:
# If the threat exists as a key in taxonomy_dict, add that tag
if (gsb_circl_threat_taxonomy.get(threat) is not None):
threat_attribute.add_tag(f'circl:incident="{gsb_circl_threat_taxonomy.get(threat)}"')
else:
threat_attribute.add_tag(f'threat-type:{str(threat).lower()}')
obj.add_attribute('platforms', **{'type': 'text', 'value': str(" ".join(platforms))})
else:
malicious_attribute = obj.add_attribute('malicious', **{'type': 'boolean', 'value': 0}) # 0 == False
malicious_attribute.add_tag(f'ioc:artifact-state="not-malicious"')
obj.add_reference(request['attribute']['uuid'], "describes")
event.add_object(obj)
# Avoid serialization issue
event = json.loads(event.to_json())
return {"results": {'Object': event['Object'], 'Attribute': event['Attribute']}}
except Exception as error:
return {"error": "An error occurred: " + str(error)}
def introspection():
return mispattributes
def version():
moduleinfo['config'] = moduleconfig
return moduleinfo

View File

@ -295,6 +295,7 @@ class RequestHandler:
headers = {
'Content-type': 'application/json',
'X-API-Key': self.api_key,
'User-Agent': 'Misp Modules'
}
req_body = request_body(query_input, query_param, current)
try:

View File

@ -0,0 +1,83 @@
import json
import requests
from . import check_input_attribute, standard_error_message
from pymisp import MISPAttribute, MISPEvent, MISPObject
mispattributes = {
'input': ['ip-src', 'ip-dst'],
'format': 'misp_standard'
}
moduleinfo = {
'version': 1,
'author': 'IP2Location.io',
'description': 'An expansion module to query IP2Location.io for additional information on an IP address',
'module-type': ['expansion', 'hover']
}
moduleconfig = ['key']
_GEOLOCATION_OBJECT_MAPPING = {
'country_code': 'countrycode',
'country_name': 'country',
'region_name': 'region',
'city_name': 'city',
'zip_code': 'zipcode',
'latitude': 'latitude',
'longitude': 'longitude'
}
def handler(q=False):
# Input checks
if q is False:
return False
request = json.loads(q)
if not request.get('attribute') or not check_input_attribute(request['attribute']):
return {'error': f'{standard_error_message}, which should contain at least a type, a value and an uuid.'}
attribute = request['attribute']
if attribute.get('type') not in mispattributes['input']:
return {'error': 'Wrong input attribute type.'}
if not request.get('config'):
return {'error': 'Missing ip2locationio config.'}
if not request['config'].get('key'):
return {'error': 'Missing ip2locationio API key.'}
# Query ip2location.io
query = requests.get(
f"https://api.ip2location.io/json?key={request['config']['key']}&ip={attribute['value']}"
)
if query.status_code != 200:
return {'error': f'Error while querying ip2location.io - {query.status_code}: {query.reason}'}
iplio_result = query.json()
# Check if the IP address is not reserved for special use
# if ipinfo.get('bogon', False):
if '' in iplio_result and iplio_result[''] == 'RSV':
return {'error': 'The IP address is reserved for special use'}
# Initiate the MISP data structures
misp_event = MISPEvent()
input_attribute = MISPAttribute()
input_attribute.from_dict(**attribute)
misp_event.add_attribute(**input_attribute)
# Parse the geolocation information related to the IP address
geolocation = MISPObject('geolocation')
for field, relation in _GEOLOCATION_OBJECT_MAPPING.items():
geolocation.add_attribute(relation, iplio_result[field])
geolocation.add_reference(input_attribute.uuid, 'locates')
misp_event.add_object(geolocation)
# Return the results in MISP format
event = json.loads(misp_event.to_json())
return {
'results': {key: event[key] for key in ('Attribute', 'Object')}
}
def introspection():
return mispattributes
def version():
moduleinfo['config'] = moduleconfig
return moduleinfo

View File

@ -27,7 +27,7 @@ def handler(q=False):
q = json.loads(q)
filename = q['attachment']
try:
img_array = np.fromstring(binascii.a2b_base64(q['data']), np.uint8)
img_array = np.frombuffer(binascii.a2b_base64(q['data']), np.uint8)
except Exception as e:
err = "Couldn't fetch attachment (JSON 'data' is empty). Are you using the 'Query enrichment' action?"
misperrors['error'] = err

View File

@ -42,7 +42,7 @@ def handler(q=False):
r.nameservers = ['8.8.8.8']
try:
answer = r.query(revname, 'PTR')
answer = r.resolve(revname, 'PTR')
except resolver.NXDOMAIN:
misperrors['error'] = "NXDOMAIN"
return misperrors

View File

@ -0,0 +1,293 @@
# -*- coding: utf-8 -*-
import base64
import numpy as np
import matplotlib.pyplot as plt
import io
import json
import tempfile
import logging
import sys
from pymisp import MISPObject, MISPEvent
from sigmf import SigMFFile
from sigmf.archive import SIGMF_DATASET_EXT, SIGMF_METADATA_EXT
import tarfile
log = logging.getLogger("sigmf-expand")
log.setLevel(logging.DEBUG)
sh = logging.StreamHandler(sys.stdout)
sh.setLevel(logging.DEBUG)
fmt = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
sh.setFormatter(fmt)
log.addHandler(sh)
misperrors = {'error': 'Error'}
mispattributes = {'input': ['sigmf-recording', 'sigmf-archive'], 'output': [
'MISP objects'], 'format': 'misp_standard'}
moduleinfo = {'version': '0.1', 'author': 'Luciano Righetti',
'description': 'Expands a SigMF Recording object into a SigMF Expanded Recording object, extracts a SigMF archive into a SigMF Recording object.',
'module-type': ['expansion']}
def get_samples(data_bytes, data_type) -> np.ndarray:
"""
Get samples from bytes.
Source: https://github.com/IQEngine/IQEngine/blob/main/api/rf/samples.py
Parameters
----------
data_bytes : bytes
The bytes to convert to samples.
data_type : str
The data type of the bytes.
Returns
-------
np.ndarray
The samples.
"""
if data_type == "ci16_le" or data_type == "ci16":
samples = np.frombuffer(data_bytes, dtype=np.int16)
samples = samples[::2] + 1j * samples[1::2]
elif data_type == "cf32_le":
samples = np.frombuffer(data_bytes, dtype=np.complex64)
elif data_type == "ci8" or data_type == "i8":
samples = np.frombuffer(data_bytes, dtype=np.int8)
samples = samples[::2] + 1j * samples[1::2]
else:
raise ("Datatype " + data_type + " not implemented")
return samples
def generate_plots(recording, meta_filename, data_bytes):
# FFT plot
filename = meta_filename.replace('.sigmf-data', '')
samples = get_samples(
data_bytes, recording.get_global_info()['core:datatype'])
sample_rate = recording.get_global_info()['core:sample_rate']
# Waterfall plot
# snippet from https://pysdr.org/content/frequency_domain.html#fast-fourier-transform-fft
fft_size = 1024
# // is an integer division which rounds down
num_rows = len(samples) // fft_size
spectrogram = np.zeros((num_rows, fft_size))
for i in range(num_rows):
spectrogram[i, :] = 10 * \
np.log10(np.abs(np.fft.fftshift(
np.fft.fft(samples[i*fft_size:(i+1)*fft_size])))**2)
plt.figure(figsize=(10, 4))
plt.title(filename)
plt.imshow(spectrogram, aspect='auto', extent=[
sample_rate/-2/1e6, sample_rate/2/1e6, 0, len(samples)/sample_rate])
plt.xlabel("Frequency [MHz]")
plt.ylabel("Time [ms]")
plt.savefig(filename + '-spectrogram.png')
waterfall_buff = io.BytesIO()
plt.savefig(waterfall_buff, format='png')
waterfall_buff.seek(0)
waterfall_png = base64.b64encode(waterfall_buff.read()).decode('utf-8')
waterfall_attr = {
'type': 'attachment',
'value': filename + '-waterfall.png',
'data': waterfall_png,
'comment': 'Waterfall plot of the recording'
}
return [{'relation': 'waterfall-plot', 'attribute': waterfall_attr}]
def process_sigmf_archive(object):
event = MISPEvent()
sigmf_data_attr = None
sigmf_meta_attr = None
try:
# get sigmf-archive attribute
for attribute in object['Attribute']:
if attribute['object_relation'] == 'SigMF-archive':
# write temp data file to disk
sigmf_archive_file = tempfile.NamedTemporaryFile(
suffix='.sigmf')
sigmf_archive_bin = base64.b64decode(attribute['data'])
with open(sigmf_archive_file.name, 'wb') as f:
f.write(sigmf_archive_bin)
f.close()
sigmf_tarfile = tarfile.open(
sigmf_archive_file.name, mode="r", format=tarfile.PAX_FORMAT)
files = sigmf_tarfile.getmembers()
for file in files:
if file.name.endswith(SIGMF_METADATA_EXT):
metadata_reader = sigmf_tarfile.extractfile(file)
sigmf_meta_attr = {
'type': 'attachment',
'value': file.name,
'data': base64.b64encode(metadata_reader.read()).decode("utf-8"),
'comment': 'SigMF metadata file',
'object_relation': 'SigMF-meta'
}
if file.name.endswith(SIGMF_DATASET_EXT):
data_reader = sigmf_tarfile.extractfile(file)
sigmf_data_attr = {
'type': 'attachment',
'value': file.name,
'data': base64.b64encode(data_reader.read()).decode("utf-8"),
'comment': 'SigMF data file',
'object_relation': 'SigMF-data'
}
if sigmf_meta_attr is None:
return {"error": "No SigMF metadata file found"}
recording = MISPObject('sigmf-recording')
recording.add_attribute(**sigmf_meta_attr)
recording.add_attribute(**sigmf_data_attr)
# add reference to original SigMF Archive object
recording.add_reference(object['uuid'], "expands")
event.add_object(recording)
event = json.loads(event.to_json())
return {"results": {'Object': event['Object']}}
# no sigmf-archive attribute found
return {"error": "No SigMF-archive attribute found"}
except Exception as e:
logging.exception(e)
return {"error": "An error occured when processing the SigMF archive"}
def process_sigmf_recording(object):
event = MISPEvent()
for attribute in object['Attribute']:
if attribute['object_relation'] == 'SigMF-data':
sigmf_data_attr = attribute
if attribute['object_relation'] == 'SigMF-meta':
sigmf_meta_attr = attribute
if sigmf_meta_attr is None:
return {"error": "No SigMF-data attribute"}
if sigmf_data_attr is None:
return {"error": "No SigMF-meta attribute"}
try:
sigmf_meta = base64.b64decode(sigmf_meta_attr['data']).decode('utf-8')
sigmf_meta = json.loads(sigmf_meta)
except Exception as e:
logging.exception(e)
return {"error": "Provided .sigmf-meta is not a valid JSON string"}
# write temp data file to disk
sigmf_data_file = tempfile.NamedTemporaryFile(suffix='.sigmf-data')
sigmf_data_bin = base64.b64decode(sigmf_data_attr['data'])
with open(sigmf_data_file.name, 'wb') as f:
f.write(sigmf_data_bin)
f.close()
try:
recording = SigMFFile(
metadata=sigmf_meta,
data_file=sigmf_data_file.name
)
except Exception as e:
logging.exception(e)
return {"error": "Provided .sigmf-meta and .sigmf-data is not a valid SigMF file"}
expanded_sigmf = MISPObject('sigmf-expanded-recording')
if 'core:author' in sigmf_meta['global']:
expanded_sigmf.add_attribute(
'author', **{'type': 'text', 'value': sigmf_meta['global']['core:author']})
if 'core:datatype' in sigmf_meta['global']:
expanded_sigmf.add_attribute(
'datatype', **{'type': 'text', 'value': sigmf_meta['global']['core:datatype']})
if 'core:description' in sigmf_meta['global']:
expanded_sigmf.add_attribute(
'description', **{'type': 'text', 'value': sigmf_meta['global']['core:description']})
if 'core:license' in sigmf_meta['global']:
expanded_sigmf.add_attribute(
'license', **{'type': 'text', 'value': sigmf_meta['global']['core:license']})
if 'core:num_channels' in sigmf_meta['global']:
expanded_sigmf.add_attribute(
'num_channels', **{'type': 'counter', 'value': sigmf_meta['global']['core:num_channels']})
if 'core:recorder' in sigmf_meta['global']:
expanded_sigmf.add_attribute(
'recorder', **{'type': 'text', 'value': sigmf_meta['global']['core:recorder']})
if 'core:sample_rate' in sigmf_meta['global']:
expanded_sigmf.add_attribute(
'sample_rate', **{'type': 'float', 'value': sigmf_meta['global']['core:sample_rate']})
if 'core:sha512' in sigmf_meta['global']:
expanded_sigmf.add_attribute(
'sha512', **{'type': 'text', 'value': sigmf_meta['global']['core:sha512']})
if 'core:version' in sigmf_meta['global']:
expanded_sigmf.add_attribute(
'version', **{'type': 'text', 'value': sigmf_meta['global']['core:version']})
# add reference to original SigMF Recording object
expanded_sigmf.add_reference(object['uuid'], "expands")
# add FFT and waterfall plot
try:
plots = generate_plots(
recording, sigmf_data_attr['value'], sigmf_data_bin)
except Exception as e:
logging.exception(e)
return {"error": "Could not generate plots"}
for plot in plots:
expanded_sigmf.add_attribute(plot['relation'], **plot['attribute'])
event.add_object(expanded_sigmf)
event = json.loads(event.to_json())
return {"results": {'Object': event['Object']}}
def handler(q=False):
request = json.loads(q)
object = request.get("object")
event = MISPEvent()
if not object:
return {"error": "No object provided"}
if 'Attribute' not in object:
return {"error": "Empty Attribute list"}
# check if it's a SigMF Archive
if object['name'] == 'sigmf-archive':
return process_sigmf_archive(object)
# check if it's a SigMF Recording
if object['name'] == 'sigmf-recording':
return process_sigmf_recording(object)
# TODO: add support for SigMF Collection
return {"error": "No SigMF object provided"}
def introspection():
return mispattributes
def version():
return moduleinfo

View File

@ -1,4 +1,3 @@
import json
from urllib.parse import urlparse
import vt
from . import check_input_attribute, standard_error_message
@ -45,7 +44,7 @@ class VirusTotalParser:
self.input_types_mapping[self.attribute.type](self.attribute.value)
def get_result(self) -> dict:
event = json.loads(self.misp_event.to_json())
event = self.misp_event.to_dict()
results = {key: event[key] for key in ('Attribute', 'Object') if (key in event and event[key])}
return {'results': results}
@ -153,11 +152,11 @@ class VirusTotalParser:
('contacted_domains', 'communicates-with'),
('contacted_ips', 'communicates-with')
]:
files_iterator = self.client.iterator(f'/files/{file_report.id}/{relationship_name}', limit=self.limit)
for file in files_iterator:
file_object = self.create_misp_object(file)
file_object.add_reference(file_object.uuid, misp_name)
self.misp_event.add_object(**file_object)
related_files_iterator = self.client.iterator(f'/files/{file_report.id}/{relationship_name}', limit=self.limit)
for related_file in related_files_iterator:
related_file_object = self.create_misp_object(related_file)
related_file_object.add_reference(file_object.uuid, misp_name)
self.misp_event.add_object(**related_file_object)
self.misp_event.add_object(**file_object)
return file_object.uuid
@ -257,10 +256,7 @@ def parse_error(status_code: int) -> str:
return "VirusTotal may not be accessible."
def handler(q=False):
if q is False:
return False
request = json.loads(q)
def dict_handler(request: dict):
if not request.get('config') or not request['config'].get('apikey'):
misperrors['error'] = 'A VirusTotal api key is required for this module.'
return misperrors

View File

@ -138,11 +138,11 @@ class VirusTotalParser:
('contacted_domains', 'communicates-with'),
('contacted_ips', 'communicates-with')
]:
files_iterator = self.client.iterator(f'/files/{file_report.id}/{relationship_name}', limit=self.limit)
for file in files_iterator:
file_object = self.create_misp_object(file)
file_object.add_reference(file_object.uuid, misp_name)
self.misp_event.add_object(**file_object)
related_files_iterator = self.client.iterator(f'/files/{file_report.id}/{relationship_name}', limit=self.limit)
for related_file in related_files_iterator:
related_file_object = self.create_misp_object(related_file)
related_file_object.add_reference(file_object.uuid, misp_name)
self.misp_event.add_object(**related_file_object)
self.misp_event.add_object(**file_object)
return file_object.uuid

View File

@ -34,7 +34,9 @@ def handler(q=False):
if 'score' in vulners_document.get('enchantments', {}):
vulners_ai_score = vulners_document['enchantments']['score']['value']
else:
vulners_ai_score = None
vulners_ai_score = vulners_api.get_ai_score(vulnerability)
if len(vulners_ai_score) == 2:
vulners_ai_score = vulners_ai_score[0]
vulners_exploits = vulners_api.searchExploit(vulnerability)
@ -44,7 +46,7 @@ def handler(q=False):
vuln_summary += 'Non existing CVE'
if vulners_ai_score:
ai_summary += 'Vulners AI Score is ' + str(vulners_ai_score[0]) + " "
ai_summary += 'Vulners AI Score is ' + str(vulners_ai_score) + " "
if vulners_exploits:
exploit_summary += " || " + str(len(vulners_exploits)) + " Public exploits available:\n "

View File

@ -0,0 +1,212 @@
import json
from pymisp import MISPAttribute, MISPEvent
from urllib.parse import urlparse
import logging
import vysion.client as vysion
import vysion.dto as dto
from vysion.dto.util import MISPProcessor
misperrors = {"error": "Error"}
mispattributes = {
"input": [
"email",
"domain",
"hostname",
"url",
"text",
"btc",
"phone-number",
"target-org",
],
"format": "misp_standard",
}
# possible module-types: 'expansion', 'hover' or both
moduleinfo = {
"version": "1",
"author": "Byron Labs",
"description": "Enrich observables with the Vysion API",
"module-type": ["expansion"],
}
# config fields that your code expects from the site admin
moduleconfig = [
"apikey",
"event_limit",
"proxy_host",
"proxy_port",
"proxy_username",
"proxy_password",
]
LOGGER = logging.getLogger("vysion")
LOGGER.setLevel(logging.INFO)
LOGGER.info("Starting Vysion")
DEFAULT_RESULTS_LIMIT = 10
def get_proxy_settings(config: dict) -> dict:
"""Returns proxy settings in the requests format.
If no proxy settings are set, return None."""
proxies = None
host = config.get("proxy_host")
port = config.get("proxy_port")
username = config.get("proxy_username")
password = config.get("proxy_password")
if host:
if not port:
misperrors["error"] = (
"The vysion_proxy_host config is set, "
"please also set the vysion_proxy_port."
)
raise KeyError
parsed = urlparse(host)
if "http" in parsed.scheme:
scheme = "http"
else:
scheme = parsed.scheme
netloc = parsed.netloc
host = f"{netloc}:{port}"
if username:
if not password:
misperrors["error"] = (
"The vysion_proxy_username config is set, "
"please also set the vysion_proxy_password."
)
raise KeyError
auth = f"{username}:{password}"
host = auth + "@" + host
proxies = {"http": f"{scheme}://{host}", "https": f"{scheme}://{host}"}
return proxies
def parse_error(status_code: int) -> str:
status_mapping = {
500: "Vysion is blind.",
400: "Incorrect request, please check the arguments.",
403: "You don't have enough privileges to make the request.",
}
if status_code in status_mapping:
return status_mapping[status_code]
return "Vysion may not be accessible."
def handler(q=False):
if q is False:
return False
request = json.loads(q)
if not request.get("config") or not request["config"].get("apikey"):
misperrors["error"] = "A Vysion api key is required for this module."
return misperrors
if not request.get("attribute"):
return {
"error": "The request is missing required attribute information, which should contain at least a type, a value, and a UUID."
}
if request["attribute"]["type"] not in mispattributes["input"]:
return {"error": "Unsupported attribute type."}
# event_limit = request["config"].get("event_limit")
attribute = request["attribute"]
proxy_settings = get_proxy_settings(request.get("config"))
try:
client = vysion.Client(
api_key=request["config"]["apikey"],
headers={
"x-tool": "MISPModuleVysionExpansion",
},
proxy=proxy_settings["http"] if proxy_settings else None,
)
LOGGER.debug(attribute)
misp_attribute = MISPAttribute()
misp_attribute.from_dict(**attribute)
attribute_type = misp_attribute.type
attribute_value = misp_attribute.value
# https://www.misp-project.org/datamodels/#types
LOGGER.debug(attribute_type)
result = None
if attribute_type == "email":
result = client.find_email(attribute_value)
elif attribute_type == "domain":
result = client.search(attribute_value)
elif attribute_type == "url":
result = client.search(
attribute_value
) # TODO result = client.find_url(attribute_value)
elif attribute_type == "text":
result = client.search(attribute_value)
elif attribute_type == "target-org":
result = client.search(attribute_value, exact=True)
elif attribute_type == "btc":
result = client.search(attribute_value) # TODO
elif attribute_type == "phone-number":
result = client.search(attribute_value) # TODO
if result is None:
return {"results": {}}
elif isinstance(result, dto.VysionError):
LOGGER.error(str(result))
return {"results": {}}
p = MISPProcessor()
misp_event: MISPEvent = p.process(result, ref_attribute=misp_attribute)
LOGGER.info("Vysion client initialized")
LOGGER.info("Vysion result obtained")
return {
"results": {
"Object": [
json.loads(object.to_json()) for object in misp_event.objects
],
"Attribute": [
json.loads(attribute.to_json())
for attribute in misp_event.attributes
],
"Tag": [
json.loads(tag.to_json())
for tag in misp_event.tags
]
}
}
except vysion.APIError as ex:
LOGGER.error("Error in Vysion")
LOGGER.error(ex)
misperrors["error"] = ex.message
return misperrors
def introspection():
return mispattributes
def version():
moduleinfo["config"] = moduleconfig
return moduleinfo

View File

@ -0,0 +1,222 @@
import json
import requests
misperrors = {'error': 'Error'}
mispattributes = {
'input': ['domain'],
'output': ['domain', 'dns-soa-email',
'whois-registrant-email', 'whois-registrant-phone',
'whois-registrant-name',
'whois-registrar', 'whois-creation-date', 'domain']
}
moduleinfo = {'version': '1', 'author': 'WhoisFreaks',
'description': 'Query on whoisfreaks.com',
'module-type': ['expansion', 'hover']}
# config fields that your code expects from the site admin
moduleconfig = ['apikey']
def handler(q=False):
if q:
request = json.loads(q)
if 'config' not in request or (not (request['config'].get('apikey') or ('apiKey' in request['config']))):
misperrors['error'] = 'WhoisFreaks authentication is missing' + request
return misperrors
apiKey = request['config'].get('apikey')
if request.get('domain'):
domain = request['domain']
return handle_domain(apiKey, domain, misperrors)
else:
misperrors['error'] = "Unsupported attributes types"
return misperrors
else:
return False
def handle_domain(apiKey, domain, errors):
result_filtered = {"results": []}
r, status_ok = expand_whois(apiKey, domain)
if status_ok:
if r:
result_filtered['results'].extend(r)
r, status_ok = expand_dns(apiKey, domain)
if status_ok:
if r:
result_filtered['results'].extend(r)
return result_filtered
def expand_whois(apiKey, domain):
r = []
ns_servers = []
status_ok = False
try:
results = get_whois_response(domain, apiKey)
if results:
status_ok = True
if 'create_date' in results:
r.append(
{
'types': ['whois-creation-date'],
'values': [results['create_date']],
'categories': ['Attribution'],
'comment': 'Creation Date for %s by whoisFreaks'
% domain
}
)
if 'domain_registrar' in results:
if 'registrar_name' in results['domain_registrar']:
r.append(
{
'types': ['whois-registrant-name'],
'values': [results['domain_registrar']['registrar_name']],
'categories': ['Attribution'],
'comment': 'Whois information of %s by whoisFreaks'
% domain
}
)
if 'email_address' in results['domain_registrar']:
r.append(
{
'types': ['whois-registrant-email'],
'values': [results['domain_registrar']['email_address']],
'categories': ['Attribution'],
'comment': 'Whois information of %s by whoisFreaks'
% domain
}
)
if 'phone_number' in results['domain_registrar']:
r.append(
{
'types': ['whois-registrant-email'],
'values': [results['domain_registrar']['phone_number']],
'categories': ['Attribution'],
'comment': 'Whois information of %s by whoisFreaks'
% domain
}
)
if 'name_servers' in results:
ns_servers = results['name_servers']
r.append(
{
'types': ['domain'],
'values': ns_servers,
'categories': ['Attribution'],
'comment': 'Name server for %s by whoisFreaks'
% domain
}
)
except Exception:
misperrors['error'] = "Error while processing Whois Data"
return [], False
return r, status_ok
def expand_dns(apiKey, domain):
r = []
status_ok = False
list_ipv4 = []
list_ipv6 = []
servers_mx = []
soa_hostnames = []
try:
results = get_dns_response(domain, apiKey)
if results:
status_ok = True
if 'dnsRecords' in results:
dns_records = results['dnsRecords']
for record in dns_records:
if record['dnsType'] == 'A':
list_ipv4.append(record['address'])
elif record['dnsType'] == 'AAAA':
list_ipv6.append(record['address'])
elif record['dnsType'] == 'MX':
servers_mx.append(record['target'])
elif record['dnsType'] == 'SOA':
soa_hostnames.append(record['host'])
if list_ipv4:
r.append({'types': ['domain|ip'],
'values': ['%s|%s' % (domain, ipv4) for ipv4 in
list_ipv4],
'categories': ['Network activity'],
'comment': 'ipv4 of %s ' %
domain
})
if list_ipv6:
r.append({'types': ['domain|ip'],
'values': ['%s|%s' % (domain, ipv6) for ipv6 in
list_ipv6],
'categories': ['Network activity'],
'comment': 'ipv6 of %s' %
domain
})
if servers_mx:
r.append({'types': ['domain'],
'values': servers_mx,
'categories': ['Network activity'],
'comment': 'mx of %s' %
domain
})
if soa_hostnames:
r.append({'types': ['domain'],
'values': soa_hostnames,
'categories': ['Network activity'],
'comment': 'soa hostname of %s' %
domain
})
except Exception:
misperrors['error'] = "Error while processing Whois Data"
return [], False
return r, status_ok
def get_whois_response(domain, apiKey):
query = requests.get(
f"https://api.whoisfreaks.com/v1.0/whois?apiKey={apiKey}&whois=live&domainName={domain}"
)
if query.status_code != 200 and query.status_code != 206:
return {'error': f'Error while querying whoisfreaks.com - {query.status_code}: {query.reason}'}
return query.json()
def get_dns_response(domain, apiKey):
query = requests.get(
f"https://api.whoisfreaks.com/v1.0/dns/live?apiKey={apiKey}&domainName={domain}&type=SOA,AAAA,A,MX"
)
if query.status_code != 200 and query.status_code != 206:
return {'error': f'Error while querying whoisfreaks.com - {query.status_code}: {query.reason}'}
return query.json()
def introspection():
return mispattributes
def version():
moduleinfo['config'] = moduleconfig
return moduleinfo

View File

@ -8,7 +8,7 @@ import json
misperrors = {"error": "Error"}
types_to_use = ['sha256', 'sha1', 'md5', 'domain', 'ip', 'url']
types_to_use = ['sha256', 'sha1', 'md5', 'domain', 'ip-src', 'ip-dst', 'url']
userConfig = {
@ -26,38 +26,48 @@ moduleinfo = {'version': '1.1', 'author': 'Julien Bachmann, Hacknowledge, Maik W
def handle_sha256(value, period):
query = f"""find in (DeviceAlertEvents, DeviceFileEvents, DeviceImageLoadEvents, DeviceProcessEvents)
where SHA256 == '{value}' or InitiatingProcessSHA1 == '{value}'"""
query = f"""find in (DeviceEvents, DeviceAlertEvents,AlertInfo, AlertEvidence, DeviceFileEvents, DeviceImageLoadEvents, DeviceProcessEvents)
where (SHA256 == '{value}' or InitiatingProcessSHA1 == '{value}') and
Timestamp between(ago({period}) .. now())"""
return query.replace('\n', ' ')
def handle_sha1(value, period):
query = f"""find in (DeviceAlertEvents, DeviceFileEvents, DeviceImageLoadEvents, DeviceProcessEvents)
where SHA1 == '{value}' or InitiatingProcessSHA1 == '{value}'"""
query = f"""find in (DeviceEvents, DeviceAlertEvents, AlertInfo, AlertEvidence, DeviceFileEvents, DeviceImageLoadEvents, DeviceProcessEvents)
where (SHA1 == '{value}' or InitiatingProcessSHA1 == '{value}') and
Timestamp between(ago({period}) .. now())"""
return query.replace('\n', ' ')
def handle_md5(value, period):
query = f"""find in (DeviceAlertEvents, DeviceFileEvents, DeviceImageLoadEvents, DeviceProcessEvents)
where MD5 == '{value}' or InitiatingProcessMD5 == '{value}'"""
query = f"""find in (DeviceEvents, DeviceAlertEvents, AlertInfo, AlertEvidence, DeviceFileEvents, DeviceImageLoadEvents, DeviceProcessEvents)
where (MD5 == '{value}' or InitiatingProcessMD5 == '{value}') and
Timestamp between(ago({period}) .. now())"""
return query.replace('\n', ' ')
def handle_domain(value, period):
query = f"""find in (DeviceAlertEvents, DeviceNetworkEvents)
where RemoteUrl contains '{value}'"""
query = f"""find in (DeviceAlertEvents, AlertInfo, AlertEvidence, DeviceNetworkEvents)
where RemoteUrl contains '{value}' and
Timestamp between(ago({period}) .. now())"""
return query.replace('\n', ' ')
def handle_ip(value, period):
query = f"""find in (DeviceAlertEvents, DeviceNetworkEvents)
where RemoteIP == '{value}'"""
query = f"""find in (DeviceAlertEvents, AlertInfo, AlertEvidence, DeviceNetworkEvents)
where RemoteIP == '{value}' and
Timestamp between(ago({period}) .. now())"""
return query.replace('\n', ' ')
def handle_url(value, period):
query = f"""find in (DeviceAlertEvents, DeviceNetworkEvents)
where RemoteUrl startswith '{value}'"""
query = f"""let url = '{value}';
search in (EmailUrlInfo,UrlClickEvents,DeviceNetworkEvents,DeviceFileEvents,DeviceEvents,BehaviorEntities, AlertInfo, AlertEvidence, DeviceAlertEvents)
Timestamp between(ago({period}) .. now()) and
RemoteUrl has url
or FileOriginUrl has url
or FileOriginReferrerUrl has url
or Url has url"""
return query.replace('\n', ' ')
@ -65,8 +75,9 @@ handlers = {
'sha256': handle_sha256,
'sha1': handle_sha1,
'md5': handle_md5,
'domain': handle_domain,
'ip': handle_ip,
'domain': handle_url,
'ip-src': handle_ip,
'ip-dst': handle_ip,
'url': handle_url
}

View File

@ -75,8 +75,8 @@ def create_collection(api_key, event_data):
response_data = response.json()
if response.status_code == 200:
link = response_data['data']['links']['self']
return f'{uuid}: {link}'
col_id = response_data['data']['id']
return f'{uuid}: https://www.virustotal.com/gui/collection/{col_id}/iocs'
error = response_data['error']['message']
if response.status_code == 400:

View File

@ -1,10 +1,6 @@
# -*- coding: utf-8 -*-
from pymisp import MISPEvent, MISPObject
from pymisp import __path__ as pymisp_path
import csv
import io
import json
import os
import base64
misperrors = {'error': 'Error'}
@ -33,7 +29,7 @@ misp_context_additional_fields = ['event_info', 'event_member_org', 'event_sourc
misp_extended_csv_header = misp_standard_csv_header + misp_context_additional_fields
class CsvParser():
class CsvParser:
def __init__(self, header, has_header, delimiter, data, from_misp, MISPtypes, categories):
self.misp_event = MISPEvent()
self.header = header
@ -77,7 +73,7 @@ class CsvParser():
return {'error': 'In order to import MISP objects, an object relation for each attribute contained in an object is required.'}
self.__build_misp_event(attribute_indexes, object_indexes)
else:
attribute_fields = attribute_fields = misp_standard_csv_header[:1] + misp_standard_csv_header[2:9]
attribute_fields = misp_standard_csv_header[:1] + misp_standard_csv_header[2:9]
attribute_indexes = []
types_indexes = []
for i in range(len(self.header)):
@ -236,7 +232,7 @@ class CsvParser():
return score
def __finalize_results(self):
event = json.loads(self.misp_event.to_json())
event = self.misp_event.to_dict()
self.results = {key: event[key] for key in ('Attribute', 'Object') if (key in event and event[key])}
@ -252,10 +248,7 @@ def __standard_parsing(data):
return list(tuple(part.strip() for part in line) for line in csv.reader(io.TextIOWrapper(io.BytesIO(data.encode()), encoding='utf-8')) if line and not line[0].startswith('#'))
def handler(q=False):
if q is False:
return False
request = json.loads(q)
def dict_handler(request: dict):
if request.get('data'):
try:
data = base64.b64decode(request['data']).decode('utf-8')
@ -282,12 +275,11 @@ def handler(q=False):
del data[0]
if header == misp_standard_csv_header or header == misp_extended_csv_header:
header = misp_standard_csv_header
descFilename = os.path.join(pymisp_path[0], 'data/describeTypes.json')
with open(descFilename, 'r') as f:
description = json.loads(f.read())['result']
MISPtypes = description['types']
description = MISPEvent().describe_types
misp_types = description['types']
for h in header:
if not any((h in MISPtypes, h in misp_extended_csv_header, h in ('', ' ', '_', 'object_id'))):
if not any((h in misp_types, h in misp_extended_csv_header, h in ('', ' ', '_', 'object_id'))):
misperrors['error'] = 'Wrong header field: {}. Please use a header value that can be recognized by MISP (or alternatively skip it using a whitespace).'.format(h)
return misperrors
from_misp = all((h in misp_extended_csv_header or h in ('', ' ', '_', 'object_id') for h in header))
@ -300,7 +292,7 @@ def handler(q=False):
wrong_types = tuple(wrong_type for wrong_type in ('type', 'value') if wrong_type in header)
misperrors['error'] = 'Error with the following header: {}. It contains the following field(s): {}, which is(are) already provided by the usage of at least on MISP attribute type in the header.'.format(header, 'and'.join(wrong_types))
return misperrors
csv_parser = CsvParser(header, has_header, delimiter, data, from_misp, MISPtypes, description['categories'])
csv_parser = CsvParser(header, has_header, delimiter, data, from_misp, misp_types, description['categories'])
# build the attributes
result = csv_parser.parse_csv()
if 'error' in result:

View File

@ -1,6 +1,4 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
import base64
import zipfile
@ -33,12 +31,7 @@ moduleconfig = ["unzip_attachments",
"extract_urls"]
def handler(q=False):
if q is False:
return False
# Decode and parse email
request = json.loads(q)
def dict_handler(request: dict):
# request data is always base 64 byte encoded
data = base64.b64decode(request["data"])
@ -51,18 +44,18 @@ def handler(q=False):
# Do we unzip attachments we find?
unzip = config.get("unzip_attachments", None)
if (unzip is not None and unzip.lower() in acceptable_config_yes):
if unzip is not None and unzip.lower() in acceptable_config_yes:
unzip = True
# Do we try to find passwords for protected zip files?
zip_pass_crack = config.get("guess_zip_attachment_passwords", None)
if (zip_pass_crack is not None and zip_pass_crack.lower() in acceptable_config_yes):
if zip_pass_crack is not None and zip_pass_crack.lower() in acceptable_config_yes:
zip_pass_crack = True
password_list = get_zip_passwords(email_object.email)
# Do we extract URL's from the email.
extract_urls = config.get("extract_urls", None)
if (extract_urls is not None and extract_urls.lower() in acceptable_config_yes):
if extract_urls is not None and extract_urls.lower() in acceptable_config_yes:
extract_urls = True
file_objects = [] # All possible file objects
@ -81,12 +74,12 @@ def handler(q=False):
# Attempt to unzip the attachment and return its files
if unzip and temp_filename.suffix[1:] not in zipped_files:
try:
unzip_attachement(attachment_name, attachment, email_object, file_objects)
unzip_attachment(attachment_name, attachment, email_object, file_objects)
except RuntimeError: # File is encrypted with a password
if zip_pass_crack is True:
password = test_zip_passwords(attachment, password_list)
if password:
unzip_attachement(attachment_name, attachment, email_object, file_objects, password)
unzip_attachment(attachment_name, attachment, email_object, file_objects, password)
else: # Inform the analyst that we could not crack password
f_object, main_object, sections = make_binary_objects(pseudofile=attachment, filename=attachment_name, standalone=False)
f_object.comment = "Encrypted Zip: Password could not be cracked from message"
@ -125,14 +118,14 @@ def handler(q=False):
file_objects.append(url_object)
email_object.add_reference(url_object.uuid, 'includes', 'URL in email body')
objects = [email_object.to_json()]
objects = [email_object.to_dict()]
if file_objects:
objects += [o.to_json() for o in file_objects if o]
r = {'results': {'Object': [json.loads(o) for o in objects]}}
objects += [o.to_dict() for o in file_objects if o]
r = {'results': {'Object': objects}}
return r
def unzip_attachement(filename, data, email_object, file_objects, password=None):
def unzip_attachment(filename, data, email_object, file_objects, password=None):
"""Extract the contents of a zipfile.
Args:
@ -289,4 +282,4 @@ def version():
if __name__ == '__main__':
with open('tests/test_no_attach.eml', 'r') as email_file:
handler(q=email_file.read())
dict_handler(json.loads(email_file.read()))

View File

@ -28,15 +28,23 @@ extra:
- icon: fontawesome/brands/github-alt
link: https://github.com/MISP
plugins:
- tags
theme:
name: material
palette:
primary: 'white'
accent: 'blue'
scheme: default
language: en
favicon: img/favicon.ico
logo: img/misp.png
feature:
- navigation.tabs
- navigation.tracking
- search.highlight
- search.share
# Extensions
markdown_extensions:

View File

@ -28,12 +28,15 @@ class TestExpansions(unittest.TestCase):
return requests.post(urljoin(self.url, "query"), json=query)
@staticmethod
def get_attribute(response):
def get_attribute_types(response):
data = response.json()
if not isinstance(data, dict):
print(json.dumps(data, indent=2))
return data
return data['results']['Attribute'][0]['type']
types = []
for attribute in data['results']['Attribute']:
types.append(attribute['type'])
return types
@staticmethod
def get_data(response):
@ -52,7 +55,18 @@ class TestExpansions(unittest.TestCase):
return data['error']
@staticmethod
def get_object(response):
def get_object_types(response):
data = response.json()
if not isinstance(data, dict):
print(json.dumps(data, indent=2))
return data
names = []
for obj in data['results']['Object']:
names.append(obj['name'])
return names
@staticmethod
def get_first_object_type(response):
data = response.json()
if not isinstance(data, dict):
print(json.dumps(data, indent=2))
@ -74,12 +88,15 @@ class TestExpansions(unittest.TestCase):
return data['results'][0]['values']
def test_apiosintds(self):
query = {'module': 'apiosintds', 'ip-dst': '185.255.79.90'}
self.skipTest("apiosintds is probably broken")
query = {'module': 'apiosintds', 'ip-dst': '10.10.10.10'}
response = self.misp_modules_post(query)
try:
self.assertTrue(self.get_values(response).startswith('185.255.79.90 IS listed by OSINT.digitalside.it.'))
self.assertTrue(self.get_values(response).startswith('IoC 10.10.10.10'))
except AssertionError:
self.assertTrue(self.get_values(response).startswith('185.255.79.90 IS NOT listed by OSINT.digitalside.it.'))
self.assertTrue(self.get_values(response).startswith('10.10.10.10 IS NOT listed by OSINT.digitalside.it.'))
def test_apivoid(self):
module_name = "apivoid"
@ -92,7 +109,7 @@ class TestExpansions(unittest.TestCase):
query['config'] = self.configs[module_name]
response = self.misp_modules_post(query)
try:
self.assertEqual(self.get_object(response), 'dns-record')
self.assertEqual(self.get_first_object_type(response), 'dns-record')
except Exception:
self.assertTrue(self.get_errors(response).startswith('You do not have enough APIVoid credits'))
else:
@ -109,7 +126,7 @@ class TestExpansions(unittest.TestCase):
}
}
response = self.misp_modules_post(query)
self.assertEqual(self.get_object(response), 'asn')
self.assertEqual(self.get_first_object_type(response), 'asn')
def test_btc_steroids(self):
if LiveCI:
@ -139,7 +156,7 @@ class TestExpansions(unittest.TestCase):
query['config'] = self.configs[module_name]
response = self.misp_modules_post(query)
try:
self.assertEqual(self.get_object(response), 'passive-dns')
self.assertEqual(self.get_first_object_type(response), 'passive-dns')
except Exception:
self.assertTrue(self.get_errors(response).startswith('There is an authentication error'))
else:
@ -157,7 +174,7 @@ class TestExpansions(unittest.TestCase):
query['config'] = self.configs[module_name]
response = self.misp_modules_post(query)
try:
self.assertEqual(self.get_object(response), 'x509')
self.assertEqual(self.get_first_object_type(response), 'x509')
except Exception:
self.assertTrue(self.get_errors(response).startswith('There is an authentication error'))
else:
@ -187,7 +204,7 @@ class TestExpansions(unittest.TestCase):
"config": {}}
response = self.misp_modules_post(query)
try:
self.assertEqual(self.get_object(response), 'vulnerability')
self.assertEqual(self.get_first_object_type(response), 'vulnerability')
except Exception:
print(self.get_errors(response))
@ -198,7 +215,7 @@ class TestExpansions(unittest.TestCase):
self.assertEqual(self.get_values(response), 'totalmateria.net - spam test domain')
except Exception:
try:
self.assertTrue(self.get_values(response).startswith('None of DNS query names exist:'))
self.assertTrue(self.get_values(response).startswith('The DNS query name does not exist:'))
except Exception:
self.assertEqual(self.get_errors(response), 'Not able to reach dbl.spamhaus.org or something went wrong')
@ -306,7 +323,7 @@ class TestExpansions(unittest.TestCase):
"value": "149.13.33.14",
"uuid": "ea89a33b-4ab7-4515-9f02-922a0bee333d"}}
response = self.misp_modules_post(query)
self.assertEqual(self.get_object(response), 'asn')
self.assertEqual(self.get_first_object_type(response), 'asn')
def test_ipqs_fraud_and_risk_scoring(self):
module_name = "ipqs_fraud_and_risk_scoring"
@ -505,7 +522,7 @@ class TestExpansions(unittest.TestCase):
if module_name in self.configs:
query['config'] = self.configs[module_name]
response = self.misp_modules_post(query)
self.assertEqual(self.get_object(response), 'ip-api-address')
self.assertEqual(self.get_first_object_type(response), 'ip-api-address')
else:
response = self.misp_modules_post(query)
self.assertEqual(self.get_errors(response), 'Shodan authentication is missing')
@ -577,21 +594,25 @@ class TestExpansions(unittest.TestCase):
query_values = ('www.bestwpdesign.com', '79.118.195.239',
'a04ac6d98ad989312783d4fe3456c53730b212c79a426fb215708b6c6daa3de3',
'http://79.118.195.239:1924/.i')
results = ('url', 'url', 'virustotal-report', 'virustotal-report')
results = ('url', 'url', 'file', 'virustotal-report')
for query_type, query_value, result in zip(query_types[:2], query_values[:2], results[:2]):
query = {"module": "urlhaus",
"attribute": {"type": query_type,
"value": query_value,
"uuid": "ea89a33b-4ab7-4515-9f02-922a0bee333d"}}
response = self.misp_modules_post(query)
self.assertEqual(self.get_attribute(response), result)
print(response.json())
self.assertIn(result, self.get_attribute_types(response))
for query_type, query_value, result in zip(query_types[2:], query_values[2:], results[2:]):
query = {"module": "urlhaus",
"attribute": {"type": query_type,
"value": query_value,
"uuid": "ea89a33b-4ab7-4515-9f02-922a0bee333d"}}
response = self.misp_modules_post(query)
self.assertEqual(self.get_object(response), result)
print(response.json())
self.assertIn(result, self.get_object_types(response))
def test_urlscan(self):
module_name = "urlscan"
@ -636,7 +657,7 @@ class TestExpansions(unittest.TestCase):
"config": self.configs[module_name]}
response = self.misp_modules_post(query)
try:
self.assertEqual(self.get_object(response), result)
self.assertEqual(self.get_first_object_type(response), result)
except Exception:
self.assertEqual(self.get_errors(response), "VirusTotal request rate limit exceeded.")
else:
@ -679,7 +700,7 @@ class TestExpansions(unittest.TestCase):
"config": self.configs[module_name]}
response = self.misp_modules_post(query)
try:
self.assertEqual(self.get_object(response), result)
self.assertEqual(self.get_first_object_type(response), result)
except Exception:
self.assertEqual(self.get_errors(response), "VirusTotal request rate limit exceeded.")
else:
@ -725,7 +746,7 @@ class TestExpansions(unittest.TestCase):
"uuid": "ea89a33b-4ab7-4515-9f02-922a0bee333d"},
"config": self.configs[module_name]}
response = self.misp_modules_post(query)
self.assertEqual(self.get_object(response), result)
self.assertEqual(self.get_first_object_type(response), result)
else:
query = {"module": module_name,
"attribute": {"type": query_types[0],