new: [clamav] Module for malware scan by ClamAV

pull/431/head
Jakub Onderka 2020-10-17 20:41:02 +02:00
parent 095fbfd75f
commit f2de7ab87f
2 changed files with 128 additions and 0 deletions
misp_modules/modules/expansion

View File

@ -26,6 +26,7 @@ click==7.1.2; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2,
colorama==0.4.3; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4'
configparser==5.0.1; python_version >= '3.6'
cryptography==3.1.1
clamd==1.0.2
decorator==4.4.2
deprecated==1.2.10; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'
dnspython==2.0.0

View File

@ -0,0 +1,127 @@
import base64
import io
import json
import logging
import sys
import zipfile
import clamd
from typing import Optional
from pymisp import MISPEvent, MISPObject
log = logging.getLogger("clamav")
log.setLevel(logging.DEBUG)
sh = logging.StreamHandler(sys.stdout)
sh.setLevel(logging.DEBUG)
fmt = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
sh.setFormatter(fmt)
log.addHandler(sh)
moduleinfo = {
"full_name": "ClamAV",
"version": "0.1",
"author": "Jakub Onderka",
"description": "Submit file to ClamAV",
"module-type": ["expansion"]
}
moduleconfig = ["connection"]
mispattributes = {
"input": ["attachment", "malware-sample"],
"format": "misp_standard"
}
def create_response(software: str, signature: Optional[str] = None) -> dict:
misp_event = MISPEvent()
if signature:
av_signature_object = MISPObject("av-signature")
av_signature_object.add_attribute("signature", signature)
av_signature_object.add_attribute("software", software)
misp_event.add_object(av_signature_object)
event = json.loads(misp_event.to_json())
results = {key: event[key] for key in ('Attribute', 'Object') if (key in event and event[key])}
return {"results": results}
def connect_to_clamav(connection_string: str) -> clamd.ClamdNetworkSocket:
if connection_string.startswith("unix://"):
return clamd.ClamdUnixSocket(connection_string.replace("unix://", ""))
elif ":" in connection_string:
host, port = connection_string.split(":")
return clamd.ClamdNetworkSocket(host, port)
else:
raise Exception("ClamAV connection string is invalid")
def handler(q=False):
if q is False:
return False
request = json.loads(q)
connection_string: str = request["config"].get("connection")
if not connection_string:
return {"error": "No ClamAV connection string provided"}
attribute = request.get("attribute")
if not attribute:
return {"error": "No attribute provided"}
attribute_type = attribute.get("type")
if not attribute_type:
return {"error": "No attribute type provided"}
if attribute_type not in mispattributes["input"]:
return {"error": "Invalid attribute type provided, expected 'malware-sample' or 'attachment'"}
attribute_data = attribute.get("data")
if not attribute_data:
return {"error": "No attribute data provided"}
try:
clamav = connect_to_clamav(connection_string)
software_version = clamav.version()
except Exception:
logging.exception("Could not connect to ClamAV")
return {"error": "Could not connect to ClamAV"}
try:
data = base64.b64decode(attribute_data, validate=True)
except Exception:
logging.exception("Provided data is not valid base64 encoded string")
return {"error": "Provided data is not valid base64 encoded string"}
if attribute_type == "malware-sample":
try:
with zipfile.ZipFile(io.BytesIO(data)) as zipf:
data = zipf.read(zipf.namelist()[0], pwd=b"infected")
except Exception:
logging.exception("Could not extract malware sample from ZIP file")
return {"error": "Could not extract malware sample from ZIP file"}
try:
status, reason = clamav.instream(io.BytesIO(data))["stream"]
except Exception:
logging.exception("Could not send attribute data to ClamAV. Maybe file is too big?")
return {"error": "Could not send attribute data to ClamAV. Maybe file is too big?"}
if status == "ERROR":
return {"error": "ClamAV returned error message: {}".format(reason)}
elif status == "OK":
return {"results": {}}
elif status == "FOUND":
return create_response(software_version, reason)
else:
return {"error": "ClamAV returned invalid status {}: {}".format(status, reason)}
def introspection():
return mispattributes
def version():
moduleinfo["config"] = moduleconfig
return moduleinfo