new: [yara_export] new export module

pull/681/head
Christophe Vandeplas 2024-08-09 15:53:27 +02:00
parent dd3ac91afd
commit 20ec7c8a18
No known key found for this signature in database
GPG Key ID: BDC48619FFDC5A5B
13 changed files with 493 additions and 117 deletions

View File

@ -18,7 +18,7 @@ jobs:
steps:
- name: Install packages
run: |
sudo apt-get install libpoppler-cpp-dev libzbar0 tesseract-ocr
sudo apt-get install libpoppler-cpp-dev libzbar0 tesseract-ocr yara
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5

View File

@ -2357,6 +2357,25 @@ This module is used to create a VirusTotal Graph from a MISP event.
-----
#### [yara_export](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/export_mod/yara_export.py)
<img src=logos/yara.png height=60>
This module is used to export MISP events to YARA.
- **features**:
>The module will dynamically generate YARA rules for attributes that are marked as to IDS. Basic metadata about the event is added to the rule.
>Attributes that are already YARA rules are also exported, with a rewritten rule name.
- **input**:
>Attributes and Objects.
- **output**:
>A YARA file that can be used with the YARA scanning tool.
- **references**:
>https://virustotal.github.io/yara/
- **requirements**:
>yara-python python library
-----
## Import Modules
#### [cof2misp](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/import_mod/cof2misp.py)

View File

@ -1,7 +1,6 @@
# -*- coding: utf-8 -*-
import os
import json
import sys
from pathlib import Path
module_types = ['expansion', 'export_mod', 'import_mod']

View File

@ -245,3 +245,22 @@ This module is used to create a VirusTotal Graph from a MISP event.
>vt_graph_api, the python library to query the VirusTotal graph API
-----
#### [yara_export](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/export_mod/yara_export.py)
<img src=../logos/yara.png height=60>
This module is used to export MISP events to YARA.
- **features**:
>The module will dynamically generate YARA rules for attributes that are marked as to IDS. Basic metadata about the event is added to the rule.
>Attributes that are already YARA rules are also exported, with a rewritten rule name.
- **input**:
>Attributes and Objects.
- **output**:
>A YARA file that can be used with the YARA scanning tool.
- **references**:
>https://virustotal.github.io/yara/
- **requirements**:
>yara-python python library
-----

View File

@ -0,0 +1,13 @@
{
"description": "This module is used to export MISP events to YARA.",
"logo": "yara.png",
"requirements": [
"yara-python python library"
],
"features": "The module will dynamically generate YARA rules for attributes that are marked as to IDS. Basic metadata about the event is added to the rule.\nAttributes that are already YARA rules are also exported, with a rewritten rule name.",
"references": [
"https://virustotal.github.io/yara/"
],
"input": "Attributes and Objects.",
"output": "A YARA file that can be used with the YARA scanning tool."
}

View File

@ -0,0 +1,281 @@
import json
import base64
import re
try:
import yara
except (OSError, ImportError):
print("yara is missing, use 'pip3 install -I -r REQUIREMENTS' from the root of this repository to install it.")
misperrors = {'error': 'Error'}
userConfig = {
}
moduleconfig = []
# fixed for now, options in the future:
# event, attribute, event-collection, attribute-collection
inputSource = ['event']
outputFileExtension = 'yara'
responseType = 'text/plain'
moduleinfo = {'version': '0.1', 'author': 'Christophe Vandeplas',
'description': 'Yara export module',
'module-type': ['export']}
class YaraRule():
def __init__(self, name):
self.name = name
self.strings = {}
self.conditions = []
self.meta = {}
def add_string(self, type_: str, s: str):
type_clean = ''.join(c if c.isalnum() or c == '_' else '_' for c in type_)
if type_clean not in self.strings:
self.strings[type_clean] = []
self.strings[type_clean].append(s)
def add_condition(self, condition: str):
self.conditions.append(condition)
def add_meta(self, key: str, value: str):
if key not in self.meta:
self.meta[key] = []
self.meta[key].append(value)
def __str__(self):
if len(self.strings) == 0 and len(self.conditions) == 0:
return "\n" # no strings, so no rule
result = []
result.append(f"rule {self.name} {{")
result.append(" meta:")
for key, values in self.meta.items():
i = 0
if len(values) == 1:
result.append(f" {key} = \"{values[0]}\"")
continue
for value in values:
result.append(f" {key}_{i} = \"{value}\"")
i += 1
result.append(" strings:")
for key, values in self.strings.items():
i = 0
for value in values:
result.append(f" ${key}_{i} = \"{value}\"")
i += 1
result.append(" condition:")
if len(self.conditions) == 0:
result.append(" any of them")
for condition in self.conditions:
result.append(f" {condition}")
result.append("}")
result.append("")
return '\n'.join(result)
def handle_string(yara_rules: list, yr: YaraRule, attribute: dict):
if not attribute['to_ids']: # skip non IDS attributes
return
yr.add_string(attribute['type'], attribute['value'])
return
def handle_combined(yara_rules: list, yr: YaraRule, attribute: dict):
if not attribute['to_ids']: # skip non IDS attributes
return
type_1, type_2 = attribute['type'].split('|')
value_1, value_2 = attribute['value'].split('|')
try:
handlers[type_1](yara_rules, yr, type_1, value_1)
except KeyError:
# ignore unsupported types
pass
try:
handlers[type_2](yara_rules, yr, type_2, value_2)
except KeyError:
# ignore unsupported types
pass
def handle_yara(yara_rules: list, yr: YaraRule, attribute):
# do not check for to_ids, as we want to always export the Yara rule
# split out as a separate rule, and rewrite the rule name
value = re.sub('^[ \t]*rule ', 'rule MISP_e{}_'.format(attribute['event_id']), attribute['value'], flags=re.MULTILINE)
# cleanup dirty stuff from people
substitutions = (('', '"'),
('', '"'),
('', '"'),
('`', "'"),
('\r', ''),
('Rule ', 'rule ') # some people write this with the wrong case
# ('$ ', '$'), # this breaks rules
# ('\t\t', '\n'), # this breaks rules
)
for substitution in substitutions:
if substitution[0] in value:
value = value.replace(substitution[0], substitution[1])
# we may ignore any global rules as they might disable everything
# on the other hand we're only processing one event...
# if 'global rule' in value:
# return
# private rules need some more rewriting
if 'private rule' in value:
priv_rules = re.findall(r'private rule (\w+)', value, flags=re.MULTILINE)
for priv_rule in priv_rules:
value = re.sub(priv_rule, 'MISP_e{}_{}'.format(attribute['event_id'], priv_rule), value, flags=re.MULTILINE)
# compile the yara rule to confirm it's validity
try:
yara.compile(source=value)
except yara.SyntaxError:
return
except yara.Error:
return
# all checks done, add the rule
yara_rules.append(value)
return
def handle_malware_sample(yara_rules: list, yr: YaraRule, attribute):
if not attribute['to_ids']: # skip non IDS attributes
return
handle_combined(yara_rules, yr, 'filename|md5', attribute['value'])
def handle_meta(yara_rules: list, yr: YaraRule, attribute):
yr.add_meta(attribute['type'], attribute['value'])
return
handlers = {
'yara': handle_yara,
'hostname': handle_string,
'hostname|port': handle_combined,
'domain': handle_string,
'domain|ip': handle_combined,
'ip': handle_string,
'ip-src': handle_string,
'ip-dst': handle_string,
'ip-dst|port': handle_combined, # we could also handle_string, which would be more specific. Less false positives, but less true positives too...
'ip-src|port': handle_combined,
'url': handle_string,
'email': handle_string,
'email-src': handle_string,
'email-dst': handle_string,
'email-subject': handle_string,
'email-attachment': handle_string,
'email-header': handle_string,
'email-reply-to': handle_string,
'email-x-mailer': handle_string,
'email-mime-boundary': handle_string,
'email-thread-index': handle_string,
'email-message-id': handle_string,
'filename': handle_string,
'filename|md5': handle_combined,
'filename|sha1': handle_combined,
'filename|sha256': handle_combined,
'filename|authentihash': handle_combined,
'filename|vhash': handle_combined,
'filename|ssdeep': handle_combined,
'filename|imphash': handle_combined,
'filename|impfuzzy': handle_combined,
'filename|pehash': handle_combined,
'filename|sha224': handle_combined,
'filename|sha384': handle_combined,
'filename|sha512': handle_combined,
'filename|sha512/224': handle_combined,
'filename|sha512/256': handle_combined,
'filename|sha3-224': handle_combined,
'filename|sha3-256': handle_combined,
'filename|sha3-384': handle_combined,
'filename|sha3-512': handle_combined,
'filename|tlsh': handle_combined,
'malware-sample': handle_malware_sample,
'pattern-in-file': handle_string,
'pattern-in-traffic': handle_string,
'pattern-in-memory': handle_string,
'link': handle_meta
}
# auto-generate the list of types to use
types_to_use = handlers.keys()
def handler(q=False):
if q is False:
return False
request = json.loads(q)
yara_rules = []
for event in request["data"]:
event_info_clean = ''.join(c if c.isalnum() or c == '_' else '_' for c in event['Event']['info'])
yr = YaraRule(f"MISP_e{event['Event']['id']}_{event_info_clean}")
yr.add_meta('description', event['Event']['info'])
yr.add_meta('author', f"MISP - {event['Orgc']['name']}")
yr.add_meta('misp_event_date', event['Event']['date'])
yr.add_meta('misp_event_id', event['Event']['id'])
yr.add_meta('misp_event_uuid', event['Event']['uuid'])
for attribute in event.get("Attribute", []):
try:
handlers[attribute['type']](yara_rules, yr, attribute)
except KeyError:
# ignore unsupported types
pass
for obj in event.get("Object", []):
for attribute in obj["Attribute"]:
try:
handlers[attribute['type']](yara_rules, yr, attribute)
except KeyError:
# ignore unsupported types
pass
yara_rules.append(str(yr))
r = {"response": [], "data": str(base64.b64encode(bytes('\n'.join(yara_rules), 'utf-8')), 'utf-8')}
return r
def introspection():
modulesetup = {}
try:
responseType
modulesetup['responseType'] = responseType
except NameError:
pass
try:
userConfig
modulesetup['userConfig'] = userConfig
except NameError:
pass
try:
outputFileExtension
modulesetup['outputFileExtension'] = outputFileExtension
except NameError:
pass
try:
inputSource
modulesetup['inputSource'] = inputSource
except NameError:
pass
return modulesetup
def version():
moduleinfo['config'] = moduleconfig
return moduleinfo

View File

@ -10,6 +10,7 @@ import os
LiveCI = True
class TestExpansions(unittest.TestCase):
def setUp(self):
@ -235,10 +236,10 @@ class TestExpansions(unittest.TestCase):
def test_censys(self):
module_name = "censys_enrich"
query = {
"attribute": {"type" : "ip-dst", "value": "8.8.8.8", "uuid": ""},
"module": module_name,
"config": {}
}
"attribute": {"type": "ip-dst", "value": "8.8.8.8", "uuid": ""},
"module": module_name,
"config": {}
}
if module_name in self.configs:
query['config'] = self.configs[module_name]
response = self.misp_modules_post(query)
@ -340,7 +341,6 @@ class TestExpansions(unittest.TestCase):
response = self.misp_modules_post(query)
self.assertEqual(self.get_errors(response), 'IPQualityScore apikey is missing')
def test_macaddess_io(self):
module_name = 'macaddress_io'
query = {"module": module_name, "mac-address": "44:38:39:ff:ef:57"}
@ -778,10 +778,3 @@ class TestExpansions(unittest.TestCase):
query = {"module": "yara_syntax_validator", "yara": 'import "hash"\r\nrule MD5 {\r\n\tcondition:\r\n\t\thash.md5(0, filesize) == "b2a5abfeef9e36964281a31e17b57c97"\r\n}'}
response = self.misp_modules_post(query)
self.assertEqual(self.get_values(response), 'Syntax valid')
@unittest.skip("Not developed yet")
def test_yara_export(self):
query = {"module": "yara_export"}
response = self.misp_modules_post(query)
expected_result = ''
self.assertEqual(self.get_values(response), expected_result)

84
tests/test_exports.py Normal file
View File

@ -0,0 +1,84 @@
"""Test module for the ThreatConnect Export module"""
import base64
import csv
import io
import json
import os
import unittest
import requests
from urllib.parse import urljoin
class TestExports(unittest.TestCase):
"""Unittest module for export modules"""
def setUp(self):
self.headers = {'Content-Type': 'application/json'}
self.url = "http://127.0.0.1:6666/"
input_event_path = "%s/test_files/misp_event.json" % os.path.dirname(os.path.realpath(__file__))
with open(input_event_path, "r") as ifile:
self.event = json.load(ifile)
def misp_modules_post(self, query):
return requests.post(urljoin(self.url, "query"), headers=self.headers, json=query)
@staticmethod
def get_values(response):
data = response.json()
if 'data' in data:
return base64.b64decode(data['data']).decode("utf-8")
def test_introspection(self):
"""checks if all export modules are offered through the misp-modules service"""
try:
response = requests.get(self.url + "modules")
modules = [module["name"] for module in response.json()]
# list modules in the export_mod folder
export_mod_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), '..', 'misp_modules', 'modules', "export_mod")
module_files = [file[:-3] for file in os.listdir(export_mod_path) if file.endswith(".py") if file != "__init__.py"]
for module in module_files:
self.assertIn(module, modules)
finally:
response.connection.close()
def test_threat_connect_export(self):
"""Test an event export"""
test_source = "Test Export"
query = {
"module": 'threat_connect_export',
"data": [self.event],
"config": {
"Default_Source": test_source
}
}
try:
response = self.misp_modules_post(query)
data = base64.b64decode(response.json()["data"]).decode("utf-8")
csvfile = io.StringIO(data)
reader = csv.DictReader(csvfile)
values = [field["Value"] for field in reader]
assert "google.com" in values
assert "127.0.0.1" in values
# resetting file pointer to read through again and extract sources
csvfile.seek(0)
# use a set comprehension to deduplicate sources
sources = {field["Source"] for field in reader}
assert test_source in sources
finally:
response.connection.close()
def test_yara_export(self):
query = {
"module": "yara_export",
"data": [self.event],
}
response = self.misp_modules_post(query)
expected_result = 'rule MISP_e625_MetadataExample\n{\n meta:\n my_identifier_1 = "Some string data"\n my_identifier_2 = 24\n my_identifier_3 = true\n\n strings:\n $my_text_string = "text here"\n $my_hex_string = { E2 34 A1 C8 23 FB }\n\n condition:\n $my_text_string or $my_hex_string\n}\n\n'
result = self.get_values(response)
self.assertEqual(result, expected_result)
if __name__ == "__main__":
unittest.main()

View File

@ -69,6 +69,22 @@
"value": "google.com|127.0.0.1",
"AttributeTag": [],
"ShadowAttribute": []
}, {
"id": "164192",
"type": "yara",
"category": "Artifacts dropped",
"to_ids": false,
"uuid": "59430251-e6a4-4900-b78b-060dc0a81112",
"event_id": "625",
"distribution": "5",
"timestamp": "1497563729",
"comment": "Test data",
"sharing_group_id": "0",
"deleted": false,
"disable_correlation": false,
"value": "rule MetadataExample\n{\n meta:\n my_identifier_1 = \"Some string data\"\n my_identifier_2 = 24\n my_identifier_3 = true\n\n strings:\n $my_text_string = \"text here\"\n $my_hex_string = { E2 34 A1 C8 23 FB }\n\n condition:\n $my_text_string or $my_hex_string\n}",
"AttributeTag": [],
"ShadowAttribute": []
}],
"ShadowAttribute": [],
"EventTag": [{

34
tests/test_yara.py Normal file
View File

@ -0,0 +1,34 @@
import json
import os
import unittest
import sys
try:
import yara
except (OSError, ImportError):
sys.exit("yara is missing, use 'pip3 install -I -r REQUIREMENTS' from the root of this repository to install it.")
class TestYara(unittest.TestCase):
"""Unittest module for yara related modules"""
def setUp(self):
self.headers = {'Content-Type': 'application/json'}
self.url = "http://127.0.0.1:6666/"
self.module = "threat_connect_export"
input_event_path = "%s/test_files/misp_event.json" % os.path.dirname(os.path.realpath(__file__))
with open(input_event_path, "r") as ifile:
self.event = json.load(ifile)
def test_install(self):
files = ['tests/yara_hash_module_test.yara', 'tests/yara_pe_module_test.yara']
for file_ in files:
try:
rule = yara.compile(file_)
self.assertIsInstance(rule, yara.Rules)
except Exception as e:
raise Exception("Error in file: {} with error: {}".format(file_, e))
if __name__ == "__main__":
unittest.main()

View File

@ -1,60 +0,0 @@
"""Test module for the ThreatConnect Export module"""
import base64
import csv
import io
import json
import os
import unittest
import requests
class TestModules(unittest.TestCase):
"""Unittest module for threat_connect_export.py"""
def setUp(self):
self.headers = {'Content-Type': 'application/json'}
self.url = "http://127.0.0.1:6666/"
self.module = "threat_connect_export"
input_event_path = "%s/test_files/misp_event.json" % os.path.dirname(os.path.realpath(__file__))
with open(input_event_path, "r") as ifile:
self.event = json.load(ifile)
def test_01_introspection(self):
"""Taken from test.py"""
try:
response = requests.get(self.url + "modules")
modules = [module["name"] for module in response.json()]
assert self.module in modules
finally:
response.connection.close()
def test_02_export(self):
"""Test an event export"""
test_source = "Test Export"
query = {
"module": self.module,
"data": [self.event],
"config": {
"Default_Source": test_source
}
}
try:
response = requests.post(self.url + "query", headers=self.headers, data=json.dumps(query))
data = base64.b64decode(response.json()["data"]).decode("utf-8")
csvfile = io.StringIO(data)
reader = csv.DictReader(csvfile)
values = [field["Value"] for field in reader]
assert "google.com" in values
assert "127.0.0.1" in values
# resetting file pointer to read through again and extract sources
csvfile.seek(0)
# use a set comprehension to deduplicate sources
sources = {field["Source"] for field in reader}
assert test_source in sources
finally:
response.connection.close()
if __name__ == "__main__":
unittest.main()

View File

@ -1,22 +0,0 @@
import sys
try:
import yara
except (OSError, ImportError):
sys.exit("yara is missing, use 'pip3 install -I -r REQUIREMENTS' from the root of this repository to install it.")
# Usage: python3 yara_test.py [yara files]
# with any yara file(s) in order to test if yara library is correctly installed.
# (it is also validating yara syntax)
#
# If no argument is given, this script takes the 2 yara test rules in the same directory
# in order to test if both yara modules we need work properly.
files = sys.argv[1:] if len(sys.argv) > 1 else ['yara_hash_module_test.yara', 'yara_pe_module_test.yara']
for file_ in files:
try:
yara.compile(file_)
status = "Valid syntax"
except Exception as e:
status = e
print("{}: {}".format(file_, status))