mirror of https://github.com/MISP/misp-modules
chg: [clamav] Add reference to original attribute
parent
0872bb820c
commit
7ad5eb0bfa
|
@ -5,6 +5,7 @@ import logging
|
||||||
import sys
|
import sys
|
||||||
import zipfile
|
import zipfile
|
||||||
import clamd
|
import clamd
|
||||||
|
from . import check_input_attribute, standard_error_message
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
from pymisp import MISPEvent, MISPObject
|
from pymisp import MISPEvent, MISPObject
|
||||||
|
|
||||||
|
@ -19,7 +20,6 @@ sh.setFormatter(fmt)
|
||||||
log.addHandler(sh)
|
log.addHandler(sh)
|
||||||
|
|
||||||
moduleinfo = {
|
moduleinfo = {
|
||||||
"full_name": "ClamAV",
|
|
||||||
"version": "0.1",
|
"version": "0.1",
|
||||||
"author": "Jakub Onderka",
|
"author": "Jakub Onderka",
|
||||||
"description": "Submit file to ClamAV",
|
"description": "Submit file to ClamAV",
|
||||||
|
@ -32,13 +32,15 @@ mispattributes = {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def create_response(software: str, signature: Optional[str] = None) -> dict:
|
def create_response(original_attribute: dict, software: str, signature: Optional[str] = None) -> dict:
|
||||||
misp_event = MISPEvent()
|
misp_event = MISPEvent()
|
||||||
if signature:
|
if signature:
|
||||||
|
misp_event.add_attribute(**original_attribute)
|
||||||
|
|
||||||
av_signature_object = MISPObject("av-signature")
|
av_signature_object = MISPObject("av-signature")
|
||||||
av_signature_object.add_attribute("signature", signature)
|
av_signature_object.add_attribute("signature", signature)
|
||||||
av_signature_object.add_attribute("software", software)
|
av_signature_object.add_attribute("software", software)
|
||||||
|
av_signature_object.add_reference(original_attribute["uuid"], "belongs-to")
|
||||||
misp_event.add_object(av_signature_object)
|
misp_event.add_object(av_signature_object)
|
||||||
|
|
||||||
event = json.loads(misp_event.to_json())
|
event = json.loads(misp_event.to_json())
|
||||||
|
@ -53,7 +55,7 @@ def connect_to_clamav(connection_string: str) -> clamd.ClamdNetworkSocket:
|
||||||
host, port = connection_string.split(":")
|
host, port = connection_string.split(":")
|
||||||
return clamd.ClamdNetworkSocket(host, int(port))
|
return clamd.ClamdNetworkSocket(host, int(port))
|
||||||
else:
|
else:
|
||||||
raise Exception("ClamAV connection string is invalid")
|
raise Exception("ClamAV connection string is invalid. It must be unix socket path with 'unix://' prefix or IP:PORT.")
|
||||||
|
|
||||||
|
|
||||||
def handler(q=False):
|
def handler(q=False):
|
||||||
|
@ -70,11 +72,10 @@ def handler(q=False):
|
||||||
if not attribute:
|
if not attribute:
|
||||||
return {"error": "No attribute provided"}
|
return {"error": "No attribute provided"}
|
||||||
|
|
||||||
attribute_type = attribute.get("type")
|
if not check_input_attribute(request['attribute']):
|
||||||
if not attribute_type:
|
return {'error': f'{standard_error_message}, which should contain at least a type, a value and an uuid.'}
|
||||||
return {"error": "No attribute type provided"}
|
|
||||||
|
|
||||||
if attribute_type not in mispattributes["input"]:
|
if attribute["type"] not in mispattributes["input"]:
|
||||||
return {"error": "Invalid attribute type provided, expected 'malware-sample' or 'attachment'"}
|
return {"error": "Invalid attribute type provided, expected 'malware-sample' or 'attachment'"}
|
||||||
|
|
||||||
attribute_data = attribute.get("data")
|
attribute_data = attribute.get("data")
|
||||||
|
@ -94,7 +95,7 @@ def handler(q=False):
|
||||||
logging.exception("Provided data is not valid base64 encoded string")
|
logging.exception("Provided data is not valid base64 encoded string")
|
||||||
return {"error": "Provided data is not valid base64 encoded string"}
|
return {"error": "Provided data is not valid base64 encoded string"}
|
||||||
|
|
||||||
if attribute_type == "malware-sample":
|
if attribute["type"] == "malware-sample":
|
||||||
try:
|
try:
|
||||||
with zipfile.ZipFile(io.BytesIO(data)) as zipf:
|
with zipfile.ZipFile(io.BytesIO(data)) as zipf:
|
||||||
data = zipf.read(zipf.namelist()[0], pwd=b"infected")
|
data = zipf.read(zipf.namelist()[0], pwd=b"infected")
|
||||||
|
@ -113,7 +114,7 @@ def handler(q=False):
|
||||||
elif status == "OK":
|
elif status == "OK":
|
||||||
return {"results": {}}
|
return {"results": {}}
|
||||||
elif status == "FOUND":
|
elif status == "FOUND":
|
||||||
return create_response(software_version, reason)
|
return create_response(attribute, software_version, reason)
|
||||||
else:
|
else:
|
||||||
return {"error": "ClamAV returned invalid status {}: {}".format(status, reason)}
|
return {"error": "ClamAV returned invalid status {}: {}".format(status, reason)}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue