misp-modules/misp_modules/modules/expansion/clamav.py

128 lines
4.0 KiB
Python
Raw Normal View History

import base64
import io
import json
import logging
import sys
import zipfile
import clamd
from typing import Optional
from pymisp import MISPEvent, MISPObject
log = logging.getLogger("clamav")
log.setLevel(logging.DEBUG)
sh = logging.StreamHandler(sys.stdout)
sh.setLevel(logging.DEBUG)
fmt = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
sh.setFormatter(fmt)
log.addHandler(sh)
moduleinfo = {
"full_name": "ClamAV",
"version": "0.1",
"author": "Jakub Onderka",
"description": "Submit file to ClamAV",
"module-type": ["expansion"]
}
moduleconfig = ["connection"]
mispattributes = {
"input": ["attachment", "malware-sample"],
"format": "misp_standard"
}
def create_response(software: str, signature: Optional[str] = None) -> dict:
misp_event = MISPEvent()
if signature:
av_signature_object = MISPObject("av-signature")
av_signature_object.add_attribute("signature", signature)
av_signature_object.add_attribute("software", software)
misp_event.add_object(av_signature_object)
event = json.loads(misp_event.to_json())
results = {key: event[key] for key in ('Attribute', 'Object') if (key in event and event[key])}
return {"results": results}
def connect_to_clamav(connection_string: str) -> clamd.ClamdNetworkSocket:
if connection_string.startswith("unix://"):
return clamd.ClamdUnixSocket(connection_string.replace("unix://", ""))
elif ":" in connection_string:
host, port = connection_string.split(":")
return clamd.ClamdNetworkSocket(host, port)
else:
raise Exception("ClamAV connection string is invalid")
def handler(q=False):
if q is False:
return False
request = json.loads(q)
connection_string: str = request["config"].get("connection")
if not connection_string:
return {"error": "No ClamAV connection string provided"}
attribute = request.get("attribute")
if not attribute:
return {"error": "No attribute provided"}
attribute_type = attribute.get("type")
if not attribute_type:
return {"error": "No attribute type provided"}
if attribute_type not in mispattributes["input"]:
return {"error": "Invalid attribute type provided, expected 'malware-sample' or 'attachment'"}
attribute_data = attribute.get("data")
if not attribute_data:
return {"error": "No attribute data provided"}
try:
clamav = connect_to_clamav(connection_string)
software_version = clamav.version()
except Exception:
logging.exception("Could not connect to ClamAV")
return {"error": "Could not connect to ClamAV"}
try:
data = base64.b64decode(attribute_data, validate=True)
except Exception:
logging.exception("Provided data is not valid base64 encoded string")
return {"error": "Provided data is not valid base64 encoded string"}
if attribute_type == "malware-sample":
try:
with zipfile.ZipFile(io.BytesIO(data)) as zipf:
data = zipf.read(zipf.namelist()[0], pwd=b"infected")
except Exception:
logging.exception("Could not extract malware sample from ZIP file")
return {"error": "Could not extract malware sample from ZIP file"}
try:
status, reason = clamav.instream(io.BytesIO(data))["stream"]
except Exception:
logging.exception("Could not send attribute data to ClamAV. Maybe file is too big?")
return {"error": "Could not send attribute data to ClamAV. Maybe file is too big?"}
if status == "ERROR":
return {"error": "ClamAV returned error message: {}".format(reason)}
elif status == "OK":
return {"results": {}}
elif status == "FOUND":
return create_response(software_version, reason)
else:
return {"error": "ClamAV returned invalid status {}: {}".format(status, reason)}
def introspection():
return mispattributes
def version():
moduleinfo["config"] = moduleconfig
return moduleinfo