2020-11-30 12:06:19 +01:00
|
|
|
import base64
|
|
|
|
import json
|
|
|
|
import re
|
|
|
|
|
|
|
|
from abc import ABC, abstractmethod
|
|
|
|
from dataclasses import asdict, dataclass, field
|
|
|
|
from enum import Enum
|
|
|
|
from pathlib import PureWindowsPath
|
|
|
|
from typing import Any, Dict, Iterator, List, Optional, Tuple, Union
|
|
|
|
|
|
|
|
from pymisp import MISPAttribute, MISPEvent, MISPObject
|
|
|
|
|
|
|
|
from .rest_api import VMRayRESTAPI, VMRayRESTAPIError
|
|
|
|
|
|
|
|
|
|
|
|
USER_RE = re.compile(r".:.Users\\(.*?)\\", re.IGNORECASE)
|
|
|
|
DOC_RE = re.compile(r".:.DOCUME~1.\\(.*?)\\", re.IGNORECASE)
|
|
|
|
DOC_AND_SETTINGS_RE = re.compile(r".:.Documents and Settings\\(.*?)\\", re.IGNORECASE)
|
|
|
|
USERPROFILES = [USER_RE, DOC_RE, DOC_AND_SETTINGS_RE]
|
|
|
|
|
|
|
|
|
|
|
|
def classifications_to_str(classifications: List[str]) -> Optional[str]:
|
|
|
|
if classifications:
|
|
|
|
return "Classifications: " + ", ".join(classifications)
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
def merge_lists(target: List[Any], source: List[Any]):
|
|
|
|
return list({*target, *source})
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
class Attribute:
|
|
|
|
type: str
|
|
|
|
value: str
|
|
|
|
category: Optional[str] = None
|
|
|
|
comment: Optional[str] = None
|
|
|
|
to_ids: bool = False
|
|
|
|
|
|
|
|
def __eq__(self, other: Dict[str, Any]) -> bool:
|
|
|
|
return asdict(self) == other
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
class Artifact:
|
|
|
|
is_ioc: bool
|
|
|
|
verdict: Optional[str]
|
|
|
|
|
|
|
|
@abstractmethod
|
|
|
|
def to_attributes(self) -> Iterator[Attribute]:
|
|
|
|
raise NotImplementedError()
|
|
|
|
|
|
|
|
@abstractmethod
|
|
|
|
def to_misp_object(self, tag: bool) -> MISPObject:
|
|
|
|
raise NotImplementedError()
|
|
|
|
|
|
|
|
@abstractmethod
|
|
|
|
def merge(self, other: "Artifact") -> None:
|
|
|
|
raise NotImplementedError()
|
|
|
|
|
|
|
|
@abstractmethod
|
|
|
|
def __eq__(self, other: "Artifact") -> bool:
|
|
|
|
raise NotImplementedError()
|
|
|
|
|
|
|
|
def tag_artifact_attribute(self, attribute: MISPAttribute) -> None:
|
|
|
|
if self.is_ioc:
|
|
|
|
attribute.add_tag('vmray:artifact="IOC"')
|
|
|
|
|
|
|
|
if self.verdict:
|
|
|
|
attribute.add_tag(f'vmray:verdict="{self.verdict}"')
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
class DomainArtifact(Artifact):
|
|
|
|
domain: str
|
|
|
|
sources: List[str]
|
|
|
|
ips: List[str] = field(default_factory=list)
|
|
|
|
classifications: List[str] = field(default_factory=list)
|
|
|
|
|
|
|
|
def to_attributes(self) -> Iterator[Attribute]:
|
|
|
|
value = self.domain
|
|
|
|
comment = ", ".join(self.sources) if self.sources else None
|
|
|
|
|
|
|
|
attr = Attribute(type="domain", value=value, comment=comment)
|
|
|
|
yield attr
|
|
|
|
|
|
|
|
def to_misp_object(self, tag: bool) -> MISPObject:
|
|
|
|
obj = MISPObject(name="domain-ip")
|
|
|
|
|
|
|
|
classifications = classifications_to_str(self.classifications)
|
|
|
|
attr = obj.add_attribute(
|
|
|
|
"domain", value=self.domain, to_ids=self.is_ioc, comment=classifications
|
|
|
|
)
|
|
|
|
if tag:
|
|
|
|
self.tag_artifact_attribute(attr)
|
|
|
|
|
|
|
|
for ip in self.ips:
|
|
|
|
obj.add_attribute("ip", value=ip, to_ids=self.is_ioc)
|
|
|
|
|
|
|
|
return obj
|
|
|
|
|
|
|
|
def merge(self, other: Artifact) -> None:
|
|
|
|
if not isinstance(other, DomainArtifact):
|
|
|
|
return
|
|
|
|
|
|
|
|
self.ips = merge_lists(self.ips, other.ips)
|
|
|
|
self.classifications = merge_lists(self.classifications, other.classifications)
|
|
|
|
|
|
|
|
def __eq__(self, other: Artifact) -> bool:
|
|
|
|
if not isinstance(other, DomainArtifact):
|
|
|
|
return NotImplemented
|
|
|
|
|
|
|
|
return self.domain == other.domain
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
class EmailArtifact(Artifact):
|
|
|
|
sender: Optional[str]
|
|
|
|
subject: Optional[str]
|
|
|
|
recipients: List[str] = field(default_factory=list)
|
|
|
|
classifications: List[str] = field(default_factory=list)
|
|
|
|
|
|
|
|
def to_attributes(self) -> Iterator[Attribute]:
|
|
|
|
if self.sender:
|
|
|
|
classifications = classifications_to_str(self.classifications)
|
|
|
|
yield Attribute(
|
|
|
|
type="email-src", value=self.sender, comment=classifications
|
|
|
|
)
|
|
|
|
|
|
|
|
if self.subject:
|
|
|
|
yield Attribute(type="email-subject", value=self.subject, to_ids=False)
|
|
|
|
|
|
|
|
for recipient in self.recipients:
|
|
|
|
yield Attribute(type="email-dst", value=recipient, to_ids=False)
|
|
|
|
|
|
|
|
def to_misp_object(self, tag: bool) -> MISPObject:
|
|
|
|
obj = MISPObject(name="email")
|
|
|
|
|
|
|
|
if self.sender:
|
|
|
|
classifications = classifications_to_str(self.classifications)
|
|
|
|
attr = obj.add_attribute(
|
|
|
|
"from", value=self.sender, to_ids=self.is_ioc, comment=classifications
|
|
|
|
)
|
|
|
|
if tag:
|
|
|
|
self.tag_artifact_attribute(attr)
|
|
|
|
|
|
|
|
if self.subject:
|
|
|
|
obj.add_attribute("subject", value=self.subject, to_ids=False)
|
|
|
|
|
|
|
|
for recipient in self.recipients:
|
|
|
|
obj.add_attribute("to", value=recipient, to_ids=False)
|
|
|
|
|
|
|
|
return obj
|
|
|
|
|
|
|
|
def merge(self, other: Artifact) -> None:
|
|
|
|
if not isinstance(other, EmailArtifact):
|
|
|
|
return
|
|
|
|
|
|
|
|
self.recipients = merge_lists(self.recipients, other.recipients)
|
|
|
|
self.classifications = merge_lists(self.classifications, other.classifications)
|
|
|
|
|
|
|
|
def __eq__(self, other: Artifact) -> bool:
|
|
|
|
if not isinstance(other, EmailArtifact):
|
|
|
|
return NotImplemented
|
|
|
|
|
|
|
|
return self.sender == other.sender and self.subject == other.subject
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
class FileArtifact(Artifact):
|
|
|
|
filenames: List[str]
|
|
|
|
operations: List[str]
|
|
|
|
md5: str
|
|
|
|
sha1: str
|
|
|
|
sha256: str
|
|
|
|
ssdeep: str
|
|
|
|
imphash: Optional[str]
|
|
|
|
classifications: List[str]
|
|
|
|
size: Optional[int]
|
|
|
|
mimetype: Optional[str] = None
|
|
|
|
|
|
|
|
def to_attributes(self) -> Iterator[Attribute]:
|
|
|
|
operations = ", ".join(self.operations)
|
|
|
|
comment = f"File operations: {operations}"
|
|
|
|
|
|
|
|
for filename in self.filenames:
|
|
|
|
attr = Attribute(type="filename", value=filename, comment=comment)
|
|
|
|
yield attr
|
|
|
|
|
|
|
|
for hash_type in ("md5", "sha1", "sha256", "ssdeep", "imphash"):
|
|
|
|
for filename in self.filenames:
|
|
|
|
value = getattr(self, hash_type)
|
|
|
|
if value is not None:
|
|
|
|
attr = Attribute(
|
|
|
|
type=f"filename|{hash_type}",
|
|
|
|
value=f"{filename}|{value}",
|
|
|
|
category="Payload delivery",
|
|
|
|
to_ids=True,
|
|
|
|
)
|
|
|
|
yield attr
|
|
|
|
|
|
|
|
def to_misp_object(self, tag: bool) -> MISPObject:
|
|
|
|
obj = MISPObject(name="file")
|
|
|
|
|
|
|
|
if self.size:
|
|
|
|
obj.add_attribute("size-in-bytes", value=self.size)
|
|
|
|
|
|
|
|
classifications = classifications_to_str(self.classifications)
|
|
|
|
hashes = [
|
|
|
|
("md5", self.md5),
|
|
|
|
("sha1", self.sha1),
|
|
|
|
("sha256", self.sha256),
|
|
|
|
("ssdeep", self.ssdeep),
|
|
|
|
]
|
|
|
|
for (key, value) in hashes:
|
|
|
|
if not value:
|
|
|
|
continue
|
|
|
|
|
|
|
|
attr = obj.add_attribute(
|
|
|
|
key, value=value, to_ids=self.is_ioc, comment=classifications
|
|
|
|
)
|
|
|
|
|
|
|
|
if tag:
|
|
|
|
self.tag_artifact_attribute(attr)
|
|
|
|
|
|
|
|
if self.mimetype:
|
|
|
|
obj.add_attribute("mimetype", value=self.mimetype, to_ids=False)
|
|
|
|
|
|
|
|
operations = None
|
|
|
|
if self.operations:
|
|
|
|
operations = "Operations: " + ", ".join(self.operations)
|
|
|
|
|
|
|
|
for filename in self.filenames:
|
|
|
|
filename = PureWindowsPath(filename)
|
|
|
|
obj.add_attribute("filename", value=filename.name, comment=operations)
|
|
|
|
|
|
|
|
fullpath = str(filename)
|
|
|
|
for regex in USERPROFILES:
|
|
|
|
fullpath = regex.sub(r"%USERPROFILE%\\", fullpath)
|
|
|
|
|
|
|
|
obj.add_attribute("fullpath", fullpath)
|
|
|
|
|
|
|
|
return obj
|
|
|
|
|
|
|
|
def merge(self, other: Artifact) -> None:
|
|
|
|
if not isinstance(other, FileArtifact):
|
|
|
|
return
|
|
|
|
|
|
|
|
self.filenames = merge_lists(self.filenames, other.filenames)
|
|
|
|
self.operations = merge_lists(self.operations, other.operations)
|
|
|
|
self.classifications = merge_lists(self.classifications, other.classifications)
|
|
|
|
|
|
|
|
def __eq__(self, other: Artifact) -> bool:
|
|
|
|
if not isinstance(other, FileArtifact):
|
|
|
|
return NotImplemented
|
|
|
|
|
|
|
|
return self.sha256 == other.sha256
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
class IpArtifact(Artifact):
|
|
|
|
ip: str
|
|
|
|
sources: List[str]
|
|
|
|
classifications: List[str] = field(default_factory=list)
|
|
|
|
|
|
|
|
def to_attributes(self) -> Iterator[Attribute]:
|
|
|
|
sources = ", ".join(self.sources)
|
|
|
|
comment = f"Found in: {sources}"
|
|
|
|
|
|
|
|
attr = Attribute(type="ip-dst", value=self.ip, comment=comment)
|
|
|
|
yield attr
|
|
|
|
|
|
|
|
def to_misp_object(self, tag: bool) -> MISPObject:
|
|
|
|
obj = MISPObject(name="ip-port")
|
|
|
|
|
|
|
|
classifications = classifications_to_str(self.classifications)
|
|
|
|
attr = obj.add_attribute(
|
|
|
|
"ip", value=self.ip, comment=classifications, to_ids=self.is_ioc
|
|
|
|
)
|
|
|
|
if tag:
|
|
|
|
self.tag_artifact_attribute(attr)
|
|
|
|
|
|
|
|
return obj
|
|
|
|
|
|
|
|
def merge(self, other: Artifact) -> None:
|
|
|
|
if not isinstance(other, IpArtifact):
|
|
|
|
return
|
|
|
|
|
|
|
|
self.sources = merge_lists(self.sources, other.sources)
|
|
|
|
self.classifications = merge_lists(self.classifications, other.classifications)
|
|
|
|
|
|
|
|
def __eq__(self, other: Artifact) -> bool:
|
|
|
|
if not isinstance(other, IpArtifact):
|
|
|
|
return NotImplemented
|
|
|
|
|
|
|
|
return self.ip == other.ip
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
class MutexArtifact(Artifact):
|
|
|
|
name: str
|
|
|
|
operations: List[str]
|
|
|
|
classifications: List[str] = field(default_factory=list)
|
|
|
|
|
|
|
|
def to_attributes(self) -> Iterator[Attribute]:
|
|
|
|
operations = ", ".join(self.operations)
|
|
|
|
comment = f"Operations: {operations}"
|
|
|
|
|
|
|
|
attr = Attribute(type="mutex", value=self.name, comment=comment)
|
|
|
|
yield attr
|
|
|
|
|
|
|
|
def to_misp_object(self, tag: bool) -> MISPObject:
|
|
|
|
obj = MISPObject(name="mutex")
|
|
|
|
|
|
|
|
classifications = classifications_to_str(self.classifications)
|
|
|
|
attr = obj.add_attribute(
|
|
|
|
"name",
|
|
|
|
value=self.name,
|
|
|
|
category="External analysis",
|
|
|
|
to_ids=False,
|
|
|
|
comment=classifications,
|
|
|
|
)
|
|
|
|
if tag:
|
|
|
|
self.tag_artifact_attribute(attr)
|
|
|
|
|
|
|
|
operations = None
|
|
|
|
if self.operations:
|
|
|
|
operations = "Operations: " + ", ".join(self.operations)
|
|
|
|
obj.add_attribute("description", value=operations, to_ids=False)
|
|
|
|
|
|
|
|
return obj
|
|
|
|
|
|
|
|
def merge(self, other: Artifact) -> None:
|
|
|
|
if not isinstance(other, MutexArtifact):
|
|
|
|
return
|
|
|
|
|
|
|
|
self.operations = merge_lists(self.operations, other.operations)
|
|
|
|
self.classifications = merge_lists(self.classifications, other.classifications)
|
|
|
|
|
|
|
|
def __eq__(self, other: Artifact) -> bool:
|
|
|
|
if not isinstance(other, MutexArtifact):
|
|
|
|
return NotImplemented
|
|
|
|
|
|
|
|
return self.name == other.name
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
class ProcessArtifact(Artifact):
|
|
|
|
filename: str
|
|
|
|
pid: Optional[int] = None
|
|
|
|
parent_pid: Optional[int] = None
|
|
|
|
cmd_line: Optional[str] = None
|
|
|
|
operations: List[str] = field(default_factory=list)
|
|
|
|
classifications: List[str] = field(default_factory=list)
|
|
|
|
|
|
|
|
def to_attributes(self) -> Iterator[Attribute]:
|
|
|
|
process_desc = f"Process created: {self.filename}\nPID: {self.pid}"
|
|
|
|
classifications = classifications_to_str(self.classifications)
|
|
|
|
yield Attribute(type="text", value=process_desc, comment=classifications)
|
|
|
|
|
|
|
|
def to_misp_object(self, tag: bool) -> MISPObject:
|
|
|
|
obj = MISPObject(name="process")
|
|
|
|
|
|
|
|
if self.pid:
|
|
|
|
obj.add_attribute("pid", value=self.pid, category="External analysis")
|
|
|
|
|
|
|
|
if self.parent_pid:
|
|
|
|
obj.add_attribute(
|
|
|
|
"parent-pid", value=self.parent_pid, category="External analysis"
|
|
|
|
)
|
|
|
|
|
|
|
|
classifications = classifications_to_str(self.classifications)
|
|
|
|
name_attr = obj.add_attribute(
|
|
|
|
"name", self.filename, category="External analysis", comment=classifications
|
|
|
|
)
|
|
|
|
|
|
|
|
cmd_attr = obj.add_attribute("command-line", value=self.cmd_line)
|
|
|
|
|
|
|
|
if tag:
|
|
|
|
self.tag_artifact_attribute(name_attr)
|
|
|
|
self.tag_artifact_attribute(cmd_attr)
|
|
|
|
|
|
|
|
return obj
|
|
|
|
|
|
|
|
def merge(self, other: Artifact) -> None:
|
|
|
|
if not isinstance(other, ProcessArtifact):
|
|
|
|
return
|
|
|
|
|
|
|
|
self.operations = merge_lists(self.operations, other.operations)
|
|
|
|
self.classifications = merge_lists(self.classifications, other.classifications)
|
|
|
|
|
|
|
|
def __eq__(self, other: Artifact) -> bool:
|
|
|
|
if not isinstance(other, ProcessArtifact):
|
|
|
|
return NotImplemented
|
|
|
|
|
|
|
|
return self.filename == other.filename and self.cmd_line == other.cmd_line
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
class RegistryArtifact(Artifact):
|
|
|
|
key: str
|
|
|
|
operations: List[str]
|
|
|
|
|
|
|
|
def to_attributes(self) -> Iterator[Attribute]:
|
|
|
|
operations = ", ".join(self.operations)
|
|
|
|
comment = f"Operations: {operations}"
|
|
|
|
|
|
|
|
attr = Attribute(type="regkey", value=self.key, comment=comment)
|
|
|
|
yield attr
|
|
|
|
|
|
|
|
def to_misp_object(self, tag: bool) -> MISPObject:
|
|
|
|
obj = MISPObject(name="registry-key")
|
|
|
|
|
|
|
|
operations = None
|
|
|
|
if self.operations:
|
|
|
|
operations = "Operations: " + ", ".join(self.operations)
|
|
|
|
|
|
|
|
attr = obj.add_attribute(
|
|
|
|
"key", value=self.key, to_ids=self.is_ioc, comment=operations
|
|
|
|
)
|
|
|
|
if tag:
|
|
|
|
self.tag_artifact_attribute(attr)
|
|
|
|
|
|
|
|
return obj
|
|
|
|
|
|
|
|
def merge(self, other: Artifact) -> None:
|
|
|
|
if not isinstance(other, RegistryArtifact):
|
|
|
|
return
|
|
|
|
|
|
|
|
self.operations = merge_lists(self.operations, other.operations)
|
|
|
|
|
|
|
|
def __eq__(self, other: Artifact) -> bool:
|
|
|
|
if not isinstance(other, RegistryArtifact):
|
|
|
|
return NotImplemented
|
|
|
|
|
|
|
|
return self.key == other.key
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
class UrlArtifact(Artifact):
|
|
|
|
url: str
|
|
|
|
operations: List[str]
|
|
|
|
domain: Optional[str] = None
|
|
|
|
ips: List[str] = field(default_factory=list)
|
|
|
|
|
|
|
|
def to_attributes(self) -> Iterator[Attribute]:
|
|
|
|
operations = ", ".join(self.operations)
|
|
|
|
comment = f"Operations: {operations}"
|
|
|
|
|
|
|
|
attr = Attribute(type="url", value=self.url, comment=comment)
|
|
|
|
yield attr
|
|
|
|
|
|
|
|
def to_misp_object(self, tag: bool) -> MISPObject:
|
|
|
|
obj = MISPObject(name="url")
|
|
|
|
|
|
|
|
operations = None
|
|
|
|
if self.operations:
|
|
|
|
operations = "Operations: " + ", ".join(self.operations)
|
|
|
|
|
|
|
|
attr = obj.add_attribute(
|
|
|
|
"url",
|
|
|
|
value=self.url,
|
|
|
|
comment=operations,
|
|
|
|
category="External analysis",
|
|
|
|
to_ids=False,
|
|
|
|
)
|
|
|
|
if tag:
|
|
|
|
self.tag_artifact_attribute(attr)
|
|
|
|
|
|
|
|
if self.domain:
|
|
|
|
obj.add_attribute(
|
|
|
|
"domain", self.domain, category="External analysis", to_ids=False
|
|
|
|
)
|
|
|
|
|
|
|
|
for ip in self.ips:
|
|
|
|
obj.add_attribute("ip", ip, category="External analysis", to_ids=False)
|
|
|
|
|
|
|
|
return obj
|
|
|
|
|
|
|
|
def merge(self, other: Artifact) -> None:
|
|
|
|
if not isinstance(other, UrlArtifact):
|
|
|
|
return
|
|
|
|
|
|
|
|
self.ips = merge_lists(self.ips, other.ips)
|
|
|
|
self.operations = merge_lists(self.operations, other.operations)
|
|
|
|
|
|
|
|
def __eq__(self, other: Artifact) -> bool:
|
|
|
|
if not isinstance(other, UrlArtifact):
|
|
|
|
return NotImplemented
|
|
|
|
|
|
|
|
return self.url == other.url and self.domain == other.domain
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
class MitreAttack:
|
|
|
|
description: str
|
|
|
|
id: str
|
|
|
|
|
|
|
|
def to_misp_galaxy(self) -> str:
|
|
|
|
return f'misp-galaxy:mitre-attack-pattern="{self.description} - {self.id}"'
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
class VTI:
|
|
|
|
category: str
|
|
|
|
operation: str
|
|
|
|
technique: str
|
|
|
|
score: int
|
|
|
|
|
|
|
|
|
|
|
|
class ReportVersion(Enum):
|
|
|
|
v1 = "v1"
|
|
|
|
v2 = "v2"
|
|
|
|
|
|
|
|
|
|
|
|
class VMRayParseError(Exception):
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
class ReportParser(ABC):
|
|
|
|
@abstractmethod
|
|
|
|
def __init__(self, api: VMRayRESTAPI, analysis_id: int):
|
|
|
|
raise NotImplementedError()
|
|
|
|
|
|
|
|
@abstractmethod
|
|
|
|
def is_static_report(self) -> bool:
|
|
|
|
raise NotImplementedError()
|
|
|
|
|
|
|
|
@abstractmethod
|
|
|
|
def artifacts(self) -> Iterator[Artifact]:
|
|
|
|
raise NotImplementedError()
|
|
|
|
|
|
|
|
@abstractmethod
|
|
|
|
def classifications(self) -> Optional[str]:
|
|
|
|
raise NotImplementedError()
|
|
|
|
|
|
|
|
@abstractmethod
|
|
|
|
def details(self) -> Iterator[str]:
|
|
|
|
raise NotImplementedError()
|
|
|
|
|
|
|
|
@abstractmethod
|
|
|
|
def mitre_attacks(self) -> Iterator[MitreAttack]:
|
|
|
|
raise NotImplementedError()
|
|
|
|
|
|
|
|
@abstractmethod
|
|
|
|
def sandbox_type(self) -> str:
|
|
|
|
raise NotImplementedError()
|
|
|
|
|
|
|
|
@abstractmethod
|
|
|
|
def score(self) -> str:
|
|
|
|
raise NotImplementedError()
|
|
|
|
|
|
|
|
@abstractmethod
|
|
|
|
def vtis(self) -> Iterator[VTI]:
|
|
|
|
raise NotImplementedError()
|
|
|
|
|
|
|
|
|
|
|
|
class Summary(ReportParser):
|
|
|
|
def __init__(
|
|
|
|
self, analysis_id: int, api: VMRayRESTAPI = None, report: Dict[str, Any] = None
|
|
|
|
):
|
|
|
|
self.analysis_id = analysis_id
|
|
|
|
|
|
|
|
if report:
|
|
|
|
self.report = report
|
|
|
|
else:
|
|
|
|
data = api.call(
|
|
|
|
"GET",
|
|
|
|
f"/rest/analysis/{analysis_id}/archive/logs/summary.json",
|
|
|
|
raw_data=True,
|
|
|
|
)
|
|
|
|
self.report = json.load(data)
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def to_verdict(score: Union[int, str]) -> Optional[str]:
|
|
|
|
if isinstance(score, int):
|
|
|
|
if 0 <= score <= 24:
|
|
|
|
return "clean"
|
|
|
|
if 25 <= score <= 74:
|
|
|
|
return "suspicious"
|
|
|
|
if 75 <= score <= 100:
|
|
|
|
return "malicious"
|
|
|
|
return "n/a"
|
|
|
|
if isinstance(score, str):
|
|
|
|
score = score.lower()
|
|
|
|
if score in ("not_suspicious", "whitelisted"):
|
|
|
|
return "clean"
|
|
|
|
if score == "blacklisted":
|
|
|
|
return "malicious"
|
|
|
|
if score in ("not_available", "unknown"):
|
|
|
|
return "n/a"
|
|
|
|
return score
|
|
|
|
return None
|
|
|
|
|
|
|
|
def is_static_report(self) -> bool:
|
|
|
|
return self.report["vti"]["vti_rule_type"] == "Static"
|
|
|
|
|
|
|
|
def artifacts(self) -> Iterator[Artifact]:
|
|
|
|
artifacts = self.report["artifacts"]
|
|
|
|
domains = artifacts.get("domains", [])
|
|
|
|
for domain in domains:
|
|
|
|
classifications = domain.get("classifications", [])
|
|
|
|
is_ioc = domain.get("ioc", False)
|
|
|
|
verdict = self.to_verdict(domain.get("severity"))
|
|
|
|
ips = domain.get("ip_addresses", [])
|
|
|
|
artifact = DomainArtifact(
|
|
|
|
domain=domain["domain"],
|
|
|
|
sources=domain["sources"],
|
|
|
|
ips=ips,
|
|
|
|
classifications=classifications,
|
|
|
|
is_ioc=is_ioc,
|
|
|
|
verdict=verdict,
|
|
|
|
)
|
|
|
|
yield artifact
|
|
|
|
|
|
|
|
emails = artifacts.get("emails", [])
|
|
|
|
for email in emails:
|
|
|
|
sender = email.get("sender")
|
|
|
|
subject = email.get("subject")
|
|
|
|
verdict = self.to_verdict(email.get("severity"))
|
|
|
|
recipients = email.get("recipients", [])
|
|
|
|
classifications = email.get("classifications", [])
|
|
|
|
is_ioc = email.get("ioc", False)
|
|
|
|
|
|
|
|
artifact = EmailArtifact(
|
|
|
|
sender=sender,
|
|
|
|
subject=subject,
|
|
|
|
verdict=verdict,
|
|
|
|
recipients=recipients,
|
|
|
|
classifications=classifications,
|
|
|
|
is_ioc=is_ioc,
|
|
|
|
)
|
|
|
|
yield artifact
|
|
|
|
|
|
|
|
files = artifacts.get("files", [])
|
|
|
|
for file_ in files:
|
|
|
|
if file_["filename"] is None:
|
|
|
|
continue
|
|
|
|
|
|
|
|
filenames = [file_["filename"]]
|
|
|
|
if "filenames" in file_:
|
|
|
|
filenames += file_["filenames"]
|
|
|
|
|
|
|
|
hashes = file_["hashes"]
|
|
|
|
classifications = file_.get("classifications", [])
|
|
|
|
operations = file_.get("operations", [])
|
|
|
|
is_ioc = file_.get("ioc", False)
|
|
|
|
mimetype = file_.get("mime_type")
|
|
|
|
verdict = self.to_verdict(file_.get("severity"))
|
|
|
|
|
|
|
|
for hash_dict in hashes:
|
|
|
|
imp = hash_dict.get("imp_hash")
|
|
|
|
|
|
|
|
artifact = FileArtifact(
|
|
|
|
filenames=filenames,
|
|
|
|
imphash=imp,
|
|
|
|
md5=hash_dict["md5_hash"],
|
|
|
|
ssdeep=hash_dict["ssdeep_hash"],
|
|
|
|
sha256=hash_dict["sha256_hash"],
|
|
|
|
sha1=hash_dict["sha1_hash"],
|
|
|
|
operations=operations,
|
|
|
|
classifications=classifications,
|
|
|
|
size=file_.get("file_size"),
|
|
|
|
is_ioc=is_ioc,
|
|
|
|
mimetype=mimetype,
|
|
|
|
verdict=verdict,
|
|
|
|
)
|
|
|
|
yield artifact
|
|
|
|
|
|
|
|
ips = artifacts.get("ips", [])
|
|
|
|
for ip in ips:
|
|
|
|
is_ioc = ip.get("ioc", False)
|
|
|
|
verdict = self.to_verdict(ip.get("severity"))
|
|
|
|
classifications = ip.get("classifications", [])
|
|
|
|
artifact = IpArtifact(
|
|
|
|
ip=ip["ip_address"],
|
|
|
|
sources=ip["sources"],
|
|
|
|
classifications=classifications,
|
|
|
|
verdict=verdict,
|
|
|
|
is_ioc=is_ioc,
|
|
|
|
)
|
|
|
|
yield artifact
|
|
|
|
|
|
|
|
mutexes = artifacts.get("mutexes", [])
|
|
|
|
for mutex in mutexes:
|
|
|
|
verdict = self.to_verdict(mutex.get("severity"))
|
|
|
|
is_ioc = mutex.get("ioc", False)
|
|
|
|
artifact = MutexArtifact(
|
|
|
|
name=mutex["mutex_name"],
|
|
|
|
operations=mutex["operations"],
|
|
|
|
classifications=[],
|
|
|
|
verdict=verdict,
|
|
|
|
is_ioc=is_ioc,
|
|
|
|
)
|
|
|
|
yield artifact
|
|
|
|
|
|
|
|
processes = artifacts.get("processes", [])
|
|
|
|
for process in processes:
|
|
|
|
classifications = process.get("classifications", [])
|
|
|
|
cmd_line = process.get("cmd_line")
|
|
|
|
name = process["image_name"]
|
|
|
|
verdict = self.to_verdict(process.get("severity"))
|
|
|
|
is_ioc = process.get("ioc", False)
|
|
|
|
|
|
|
|
artifact = ProcessArtifact(
|
|
|
|
filename=name,
|
|
|
|
classifications=classifications,
|
|
|
|
cmd_line=cmd_line,
|
|
|
|
verdict=verdict,
|
|
|
|
is_ioc=is_ioc,
|
|
|
|
)
|
2020-11-30 12:48:01 +01:00
|
|
|
yield artifact
|
2020-11-30 12:06:19 +01:00
|
|
|
|
|
|
|
registry = artifacts.get("registry", [])
|
|
|
|
for reg in registry:
|
|
|
|
is_ioc = reg.get("ioc", False)
|
|
|
|
verdict = self.to_verdict(reg.get("severity"))
|
|
|
|
artifact = RegistryArtifact(
|
|
|
|
key=reg["reg_key_name"],
|
|
|
|
operations=reg["operations"],
|
|
|
|
verdict=verdict,
|
|
|
|
is_ioc=is_ioc,
|
|
|
|
)
|
|
|
|
yield artifact
|
|
|
|
|
|
|
|
urls = artifacts.get("urls", [])
|
|
|
|
for url in urls:
|
|
|
|
ips = url.get("ip_addresses", [])
|
|
|
|
is_ioc = url.get("ioc", False)
|
|
|
|
verdict = self.to_verdict(url.get("severity"))
|
|
|
|
|
|
|
|
artifact = UrlArtifact(
|
|
|
|
url=url["url"],
|
|
|
|
operations=url["operations"],
|
|
|
|
ips=ips,
|
|
|
|
is_ioc=is_ioc,
|
|
|
|
verdict=verdict,
|
|
|
|
)
|
|
|
|
yield artifact
|
|
|
|
|
|
|
|
def classifications(self) -> Optional[str]:
|
|
|
|
classifications = self.report["classifications"]
|
|
|
|
if classifications:
|
|
|
|
str_classifications = ", ".join(classifications)
|
|
|
|
return f"Classifications: {str_classifications}"
|
|
|
|
return None
|
|
|
|
|
|
|
|
def details(self) -> Iterator[str]:
|
|
|
|
details = self.report["analysis_details"]
|
|
|
|
execution_successful = details["execution_successful"]
|
|
|
|
termination_reason = details["termination_reason"]
|
|
|
|
result = details["result_str"]
|
|
|
|
|
|
|
|
if self.analysis_id == 0:
|
|
|
|
analysis = ""
|
|
|
|
else:
|
|
|
|
analysis = f" {self.analysis_id}"
|
|
|
|
|
|
|
|
yield f"Analysis{analysis}: execution_successful: {execution_successful}"
|
|
|
|
yield f"Analysis{analysis}: termination_reason: {termination_reason}"
|
|
|
|
yield f"Analysis{analysis}: result: {result}"
|
|
|
|
|
|
|
|
def mitre_attacks(self) -> Iterator[MitreAttack]:
|
|
|
|
mitre_attack = self.report["mitre_attack"]
|
|
|
|
techniques = mitre_attack.get("techniques", [])
|
|
|
|
|
|
|
|
for technique in techniques:
|
|
|
|
mitre_attack = MitreAttack(
|
|
|
|
description=technique["description"], id=technique["id"]
|
|
|
|
)
|
|
|
|
yield mitre_attack
|
|
|
|
|
|
|
|
def sandbox_type(self) -> str:
|
|
|
|
vm_name = self.report["vm_and_analyzer_details"]["vm_name"]
|
|
|
|
sample_type = self.report["sample_details"]["sample_type"]
|
|
|
|
return f"{vm_name} | {sample_type}"
|
|
|
|
|
|
|
|
def score(self) -> str:
|
|
|
|
vti_score = self.report["vti"]["vti_score"]
|
|
|
|
return self.to_verdict(vti_score)
|
|
|
|
|
|
|
|
def vtis(self) -> Iterator[VTI]:
|
|
|
|
try:
|
|
|
|
vtis = self.report["vti"]["vti_rule_matches"]
|
|
|
|
except KeyError:
|
|
|
|
vtis = []
|
|
|
|
|
|
|
|
for vti in vtis:
|
|
|
|
new_vti = VTI(
|
|
|
|
category=vti["category_desc"],
|
|
|
|
operation=vti["operation_desc"],
|
|
|
|
technique=vti["technique_desc"],
|
|
|
|
score=vti["rule_score"],
|
|
|
|
)
|
|
|
|
|
|
|
|
yield new_vti
|
|
|
|
|
|
|
|
|
|
|
|
class SummaryV2(ReportParser):
|
|
|
|
def __init__(
|
|
|
|
self, analysis_id: int, api: VMRayRESTAPI = None, report: Dict[str, Any] = None
|
|
|
|
):
|
|
|
|
self.analysis_id = analysis_id
|
|
|
|
|
|
|
|
if report:
|
|
|
|
self.report = report
|
|
|
|
else:
|
|
|
|
self.api = api
|
|
|
|
data = api.call(
|
|
|
|
"GET",
|
|
|
|
f"/rest/analysis/{analysis_id}/archive/logs/summary_v2.json",
|
|
|
|
raw_data=True,
|
|
|
|
)
|
|
|
|
self.report = json.load(data)
|
|
|
|
|
|
|
|
def _resolve_refs(
|
|
|
|
self, data: Union[List[Dict[str, Any]], Dict[str, Any]]
|
|
|
|
) -> Iterator[Dict[str, Any]]:
|
|
|
|
if not data:
|
|
|
|
return []
|
|
|
|
|
|
|
|
if isinstance(data, dict):
|
|
|
|
data = [data]
|
|
|
|
|
|
|
|
for ref in data:
|
|
|
|
yield self._resolve_ref(ref)
|
|
|
|
|
|
|
|
def _resolve_ref(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
|
|
|
if data == {}:
|
|
|
|
return {}
|
|
|
|
|
|
|
|
if data["_type"] != "reference" or data["source"] != "logs/summary_v2.json":
|
|
|
|
return {}
|
|
|
|
|
|
|
|
resolved_ref = self.report
|
|
|
|
paths = data["path"]
|
|
|
|
for path_part in paths:
|
|
|
|
try:
|
|
|
|
resolved_ref = resolved_ref[path_part]
|
|
|
|
except KeyError:
|
|
|
|
return {}
|
|
|
|
|
|
|
|
return resolved_ref
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def convert_verdict(verdict: Optional[str]) -> str:
|
|
|
|
if verdict == "not_available" or not verdict:
|
|
|
|
return "n/a"
|
|
|
|
|
|
|
|
return verdict
|
|
|
|
|
|
|
|
def is_static_report(self) -> bool:
|
|
|
|
return self.report["vti"]["score_type"] == "static"
|
|
|
|
|
|
|
|
def artifacts(self) -> Iterator[Artifact]:
|
|
|
|
artifacts = self.report["artifacts"]
|
|
|
|
|
|
|
|
ref_domains = artifacts.get("ref_domains", [])
|
|
|
|
for domain in self._resolve_refs(ref_domains):
|
|
|
|
classifications = domain.get("classifications", [])
|
|
|
|
artifact = DomainArtifact(
|
|
|
|
domain=domain["domain"],
|
|
|
|
sources=domain["sources"],
|
2020-11-30 12:48:01 +01:00
|
|
|
classifications=classifications,
|
2020-11-30 12:06:19 +01:00
|
|
|
is_ioc=domain["is_ioc"],
|
|
|
|
verdict=domain["verdict"],
|
|
|
|
)
|
|
|
|
|
|
|
|
ref_ip_addresses = domain.get("ref_ip_addresses", [])
|
|
|
|
if not ref_ip_addresses:
|
|
|
|
continue
|
|
|
|
|
|
|
|
for ip_address in self._resolve_refs(ref_ip_addresses):
|
|
|
|
artifact.ips.append(ip_address["ip_address"])
|
|
|
|
|
|
|
|
yield artifact
|
|
|
|
|
|
|
|
ref_emails = artifacts.get("ref_emails", [])
|
|
|
|
for email in self._resolve_refs(ref_emails):
|
|
|
|
sender = email.get("sender")
|
|
|
|
subject = email.get("subject")
|
|
|
|
recipients = email.get("recipients", [])
|
|
|
|
verdict = email["verdict"]
|
|
|
|
is_ioc = email["is_ioc"]
|
|
|
|
classifications = email.get("classifications", [])
|
|
|
|
|
|
|
|
artifact = EmailArtifact(
|
|
|
|
sender=sender,
|
|
|
|
subject=subject,
|
|
|
|
recipients=recipients,
|
|
|
|
classifications=classifications,
|
|
|
|
verdict=verdict,
|
|
|
|
is_ioc=is_ioc,
|
|
|
|
)
|
|
|
|
|
|
|
|
yield artifact
|
|
|
|
|
|
|
|
ref_files = artifacts.get("ref_files", [])
|
|
|
|
for file_ in self._resolve_refs(ref_files):
|
|
|
|
filenames = []
|
|
|
|
|
|
|
|
if "ref_filenames" in file_:
|
|
|
|
for filename in self._resolve_refs(file_["ref_filenames"]):
|
|
|
|
if not filename:
|
|
|
|
continue
|
|
|
|
filenames.append(filename["filename"])
|
|
|
|
|
|
|
|
artifact = FileArtifact(
|
|
|
|
operations=file_.get("operations", []),
|
|
|
|
md5=file_["hash_values"]["md5"],
|
|
|
|
sha1=file_["hash_values"]["sha1"],
|
|
|
|
sha256=file_["hash_values"]["sha256"],
|
|
|
|
ssdeep=file_["hash_values"]["ssdeep"],
|
|
|
|
imphash=None,
|
|
|
|
mimetype=file_.get("mime_type"),
|
|
|
|
filenames=filenames,
|
|
|
|
is_ioc=file_["is_ioc"],
|
|
|
|
classifications=file_.get("classifications", []),
|
|
|
|
size=file_["size"],
|
|
|
|
verdict=file_["verdict"],
|
|
|
|
)
|
|
|
|
yield artifact
|
|
|
|
|
|
|
|
ref_ip_addresses = artifacts.get("ref_ip_addresses", [])
|
|
|
|
for ip in self._resolve_refs(ref_ip_addresses):
|
|
|
|
classifications = ip.get("classifications", [])
|
|
|
|
verdict = ip["verdict"]
|
|
|
|
is_ioc = ip["is_ioc"]
|
|
|
|
artifact = IpArtifact(
|
|
|
|
ip=ip["ip_address"],
|
|
|
|
sources=ip["sources"],
|
|
|
|
classifications=classifications,
|
|
|
|
verdict=verdict,
|
|
|
|
is_ioc=is_ioc,
|
|
|
|
)
|
|
|
|
yield artifact
|
|
|
|
|
|
|
|
ref_mutexes = artifacts.get("ref_mutexes", [])
|
|
|
|
for mutex in self._resolve_refs(ref_mutexes):
|
|
|
|
is_ioc = mutex["is_ioc"]
|
|
|
|
classifications = mutex.get("classifications", [])
|
|
|
|
artifact = MutexArtifact(
|
|
|
|
name=mutex["name"],
|
|
|
|
operations=mutex["operations"],
|
|
|
|
verdict=mutex["verdict"],
|
|
|
|
classifications=classifications,
|
|
|
|
is_ioc=is_ioc,
|
|
|
|
)
|
|
|
|
yield artifact
|
|
|
|
|
|
|
|
ref_processes = artifacts.get("ref_processes", [])
|
|
|
|
for process in self._resolve_refs(ref_processes):
|
|
|
|
cmd_line = process.get("cmd_line")
|
|
|
|
classifications = process.get("classifications", [])
|
|
|
|
verdict = process.get("verdict")
|
|
|
|
artifact = ProcessArtifact(
|
|
|
|
pid=process["os_pid"],
|
|
|
|
parent_pid=process["origin_monitor_id"],
|
|
|
|
filename=process["filename"],
|
|
|
|
is_ioc=process["is_ioc"],
|
|
|
|
cmd_line=cmd_line,
|
|
|
|
classifications=classifications,
|
|
|
|
verdict=verdict,
|
|
|
|
)
|
2020-11-30 12:48:01 +01:00
|
|
|
yield artifact
|
2020-11-30 12:06:19 +01:00
|
|
|
|
|
|
|
ref_registry_records = artifacts.get("ref_registry_records", [])
|
|
|
|
for reg in self._resolve_refs(ref_registry_records):
|
|
|
|
artifact = RegistryArtifact(
|
|
|
|
key=reg["reg_key_name"],
|
|
|
|
operations=reg["operations"],
|
|
|
|
is_ioc=reg["is_ioc"],
|
|
|
|
verdict=reg["verdict"],
|
|
|
|
)
|
|
|
|
yield artifact
|
|
|
|
|
|
|
|
url_refs = artifacts.get("ref_urls", [])
|
|
|
|
for url in self._resolve_refs(url_refs):
|
|
|
|
domain = None
|
|
|
|
ref_domain = url.get("ref_domain", {})
|
|
|
|
if ref_domain:
|
|
|
|
domain = self._resolve_ref(ref_domain)["domain"]
|
|
|
|
|
|
|
|
ips = []
|
|
|
|
ref_ip_addresses = url.get("ref_ip_addresses", [])
|
|
|
|
for ip_address in self._resolve_refs(ref_ip_addresses):
|
|
|
|
ips.append(ip_address["ip_address"])
|
|
|
|
|
|
|
|
artifact = UrlArtifact(
|
|
|
|
url=url["url"],
|
|
|
|
operations=url["operations"],
|
|
|
|
is_ioc=url["is_ioc"],
|
|
|
|
domain=domain,
|
|
|
|
ips=ips,
|
|
|
|
verdict=url["verdict"],
|
|
|
|
)
|
|
|
|
yield artifact
|
|
|
|
|
|
|
|
def classifications(self) -> Optional[str]:
|
|
|
|
try:
|
|
|
|
classifications = ", ".join(self.report["classifications"])
|
|
|
|
return f"Classifications: {classifications}"
|
|
|
|
except KeyError:
|
|
|
|
return None
|
|
|
|
|
|
|
|
def details(self) -> Iterator[str]:
|
|
|
|
details = self.report["analysis_metadata"]
|
|
|
|
is_execution_successful = details["is_execution_successful"]
|
|
|
|
termination_reason = details["termination_reason"]
|
|
|
|
result = details["result_str"]
|
|
|
|
|
|
|
|
yield f"Analysis {self.analysis_id}: execution_successful: {is_execution_successful}"
|
|
|
|
yield f"Analysis {self.analysis_id}: termination_reason: {termination_reason}"
|
|
|
|
yield f"Analysis {self.analysis_id}: result: {result}"
|
|
|
|
|
|
|
|
def mitre_attacks(self) -> Iterator[MitreAttack]:
|
|
|
|
mitre_attack = self.report["mitre_attack"]
|
|
|
|
techniques = mitre_attack["v4"]["techniques"]
|
|
|
|
|
|
|
|
for technique_id, technique in techniques.items():
|
|
|
|
mitre_attack = MitreAttack(
|
|
|
|
description=technique["description"],
|
|
|
|
id=technique_id.replace("technique_", ""),
|
|
|
|
)
|
|
|
|
yield mitre_attack
|
|
|
|
|
|
|
|
def sandbox_type(self) -> str:
|
|
|
|
vm_information = self.report["virtual_machine"]["description"]
|
|
|
|
sample_type = self.report["analysis_metadata"]["sample_type"]
|
|
|
|
return f"{vm_information} | {sample_type}"
|
|
|
|
|
|
|
|
def score(self) -> str:
|
|
|
|
verdict = self.report["analysis_metadata"]["verdict"]
|
|
|
|
return self.convert_verdict(verdict)
|
|
|
|
|
|
|
|
def vtis(self) -> Iterator[VTI]:
|
|
|
|
if "matches" not in self.report["vti"]:
|
|
|
|
return
|
|
|
|
|
|
|
|
vti_matches = self.report["vti"]["matches"]
|
|
|
|
for vti in vti_matches.values():
|
|
|
|
new_vti = VTI(
|
|
|
|
category=vti["category_desc"],
|
|
|
|
operation=vti["operation_desc"],
|
|
|
|
technique=vti["technique_desc"],
|
|
|
|
score=vti["analysis_score"],
|
|
|
|
)
|
|
|
|
|
|
|
|
yield new_vti
|
|
|
|
|
|
|
|
|
|
|
|
class VMRayParser:
|
|
|
|
def __init__(self) -> None:
|
|
|
|
# required for api import
|
|
|
|
self.api: Optional[VMRayRESTAPI] = None
|
|
|
|
self.sample_id: Optional[int] = None
|
|
|
|
|
|
|
|
# required for file import
|
|
|
|
self.report: Optional[Dict[str, Any]] = None
|
|
|
|
self.report_name: Optional[str] = None
|
|
|
|
self.include_report = False
|
|
|
|
|
|
|
|
# required by API import and file import
|
|
|
|
self.report_version = ReportVersion.v2
|
|
|
|
|
|
|
|
self.use_misp_object = True
|
|
|
|
self.ignore_analysis_finished = False
|
|
|
|
self.tag_objects = True
|
|
|
|
|
|
|
|
self.include_analysis_id = True
|
|
|
|
self.include_vti_details = True
|
|
|
|
self.include_iocs = True
|
|
|
|
self.include_all_artifacts = False
|
|
|
|
self.include_analysis_details = True
|
|
|
|
|
|
|
|
# a new event if we use misp objects
|
|
|
|
self.event = MISPEvent()
|
|
|
|
|
|
|
|
# new attributes if we don't use misp objects
|
|
|
|
self.attributes: List[Attribute] = []
|
|
|
|
|
|
|
|
def from_api(self, config: Dict[str, Any]) -> None:
|
|
|
|
url = self._read_config_key(config, "url")
|
|
|
|
api_key = self._read_config_key(config, "apikey")
|
|
|
|
|
|
|
|
try:
|
|
|
|
self.sample_id = int(self._read_config_key(config, "Sample ID"))
|
|
|
|
except ValueError:
|
|
|
|
raise VMRayParseError("Could not convert sample id to integer.")
|
|
|
|
|
|
|
|
self.api = VMRayRESTAPI(url, api_key, False)
|
|
|
|
|
|
|
|
self.ignore_analysis_finished = self._config_from_string(config.get("ignore_analysis_finished"))
|
|
|
|
self._setup_optional_config(config)
|
|
|
|
self.report_version = self._get_report_version()
|
|
|
|
|
|
|
|
def from_base64_string(
|
|
|
|
self, config: Dict[str, Any], data: str, filename: str
|
|
|
|
) -> None:
|
|
|
|
""" read base64 encoded summary json """
|
|
|
|
|
|
|
|
buffer = base64.b64decode(data)
|
|
|
|
self.report = json.loads(buffer)
|
|
|
|
self.report_name = filename
|
|
|
|
|
|
|
|
if "analysis_details" in self.report:
|
|
|
|
self.report_version = ReportVersion.v1
|
|
|
|
elif "analysis_metadata" in self.report:
|
|
|
|
self.report_version = ReportVersion.v2
|
|
|
|
else:
|
|
|
|
raise VMRayParseError("Uploaded file is not a summary.json")
|
|
|
|
|
|
|
|
self._setup_optional_config(config)
|
|
|
|
self.include_report = bool(int(config.get("Attach Report", "0")))
|
|
|
|
|
|
|
|
def _setup_optional_config(self, config: Dict[str, Any]) -> None:
|
|
|
|
self.include_analysis_id = bool(int(config.get("Analysis ID", "1")))
|
|
|
|
self.include_vti_details = bool(int(config.get("VTI", "1")))
|
|
|
|
self.include_iocs = bool(int(config.get("IOCs", "1")))
|
|
|
|
self.include_all_artifacts = bool(int(config.get("Artifacts", "0")))
|
|
|
|
self.include_analysis_details = bool(int(config.get("Analysis Details", "1")))
|
|
|
|
|
|
|
|
self.use_misp_object = not self._config_from_string(
|
|
|
|
config.get("disable_misp_objects")
|
|
|
|
)
|
|
|
|
self.tag_objects = not self._config_from_string(config.get("disable_tags"))
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def _config_from_string(text: Optional[str]) -> bool:
|
|
|
|
if not text:
|
|
|
|
return False
|
|
|
|
|
|
|
|
text = text.lower()
|
|
|
|
return text in ("yes", "true")
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def _read_config_key(config: Dict[str, Any], key: str) -> str:
|
|
|
|
try:
|
|
|
|
value = config[key]
|
|
|
|
return value
|
|
|
|
except KeyError:
|
|
|
|
raise VMRayParseError(f"VMRay config is missing a value for `{key}`.")
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def _analysis_score_to_taxonomies(analysis_score: int) -> Optional[str]:
|
|
|
|
mapping = {
|
|
|
|
-1: "-1",
|
|
|
|
1: "1/5",
|
|
|
|
2: "2/5",
|
|
|
|
3: "3/5",
|
|
|
|
4: "4/5",
|
|
|
|
5: "5/5",
|
|
|
|
}
|
|
|
|
|
|
|
|
try:
|
|
|
|
return mapping[analysis_score]
|
|
|
|
except KeyError:
|
|
|
|
return None
|
|
|
|
|
|
|
|
def _get_report_version(self) -> ReportVersion:
|
|
|
|
info = self._vmary_api_call("/rest/system_info")
|
|
|
|
if info["version_major"] >= 4:
|
|
|
|
return ReportVersion.v2
|
|
|
|
|
|
|
|
# version 3.2 an less do not tag artifacts as ICOs
|
|
|
|
# so we extract all artifacts
|
|
|
|
if info["version_major"] == 3 and info["version_minor"] < 3:
|
|
|
|
self.include_all_artifacts = True
|
|
|
|
return ReportVersion.v1
|
|
|
|
|
|
|
|
def _vmary_api_call(
|
|
|
|
self, api_path: str, params: Dict[str, Any] = None, raw_data: bool = False
|
|
|
|
) -> Union[Dict[str, Any], bytes]:
|
|
|
|
try:
|
|
|
|
return self.api.call("GET", api_path, params=params, raw_data=raw_data)
|
|
|
|
except (VMRayRESTAPIError, ValueError) as exc:
|
|
|
|
raise VMRayParseError(str(exc))
|
|
|
|
|
|
|
|
def _get_analysis(self) -> Dict[str, Any]:
|
|
|
|
return self._vmary_api_call(f"/rest/analysis/sample/{self.sample_id}")
|
|
|
|
|
|
|
|
def _analysis_finished(self) -> bool:
|
|
|
|
result = self._vmary_api_call(f"/rest/submission/sample/{self.sample_id}")
|
|
|
|
|
|
|
|
all_finished = []
|
|
|
|
for submission in result:
|
|
|
|
finished = submission["submission_finished"]
|
|
|
|
all_finished.append(finished)
|
|
|
|
|
|
|
|
return all(all_finished)
|
|
|
|
|
|
|
|
def _online_reports(self) -> Iterator[Tuple[ReportParser, str]]:
|
|
|
|
# check if sample id exists
|
|
|
|
try:
|
|
|
|
self._vmary_api_call(f"/rest/sample/{self.sample_id}")
|
|
|
|
except VMRayRESTAPIError:
|
|
|
|
raise VMRayParseError(
|
|
|
|
f"Could not find sample id `{self.sample_id}` on server."
|
|
|
|
)
|
|
|
|
|
|
|
|
# check if all submission are finished
|
|
|
|
if not self.ignore_analysis_finished and not self._analysis_finished():
|
|
|
|
raise VMRayParseError(
|
|
|
|
f"Not all analysis for `{self.sample_id}` are finished. "
|
|
|
|
"Try it again in a few minutes."
|
|
|
|
)
|
|
|
|
|
|
|
|
analysis_results = self._get_analysis()
|
|
|
|
for analysis in analysis_results:
|
|
|
|
analysis_id = analysis["analysis_id"]
|
|
|
|
permalink = analysis["analysis_webif_url"]
|
|
|
|
|
|
|
|
# the summary json could not exist, due to a VM error
|
|
|
|
try:
|
|
|
|
if self.report_version == ReportVersion.v1:
|
|
|
|
report_parser = Summary(api=self.api, analysis_id=analysis_id)
|
|
|
|
else:
|
|
|
|
report_parser = SummaryV2(api=self.api, analysis_id=analysis_id)
|
|
|
|
except VMRayRESTAPIError:
|
|
|
|
continue
|
|
|
|
|
|
|
|
yield report_parser, permalink
|
|
|
|
|
|
|
|
def _offline_report(self) -> ReportParser:
|
|
|
|
if self.report_version == ReportVersion.v1:
|
|
|
|
analysis_id = 0
|
|
|
|
return Summary(report=self.report, analysis_id=analysis_id)
|
|
|
|
else:
|
|
|
|
analysis_id = self.report["analysis_metadata"]["analysis_id"]
|
|
|
|
return SummaryV2(report=self.report, analysis_id=analysis_id)
|
|
|
|
|
|
|
|
def _reports(self) -> Iterator[Tuple[ReportParser, Optional[str]]]:
|
|
|
|
if self.report:
|
|
|
|
yield self._offline_report(), None
|
|
|
|
else:
|
|
|
|
yield from self._online_reports()
|
|
|
|
|
|
|
|
def _get_sample_verdict(self) -> Optional[str]:
|
|
|
|
if self.report:
|
|
|
|
if self.report_version == ReportVersion.v2:
|
|
|
|
verdict = SummaryV2.convert_verdict(
|
|
|
|
self.report["analysis_metadata"]["verdict"]
|
|
|
|
)
|
|
|
|
return verdict
|
|
|
|
return None
|
|
|
|
|
|
|
|
data = self._vmary_api_call(f"/rest/sample/{self.sample_id}")
|
|
|
|
if "sample_verdict" in data:
|
|
|
|
verdict = SummaryV2.convert_verdict(data["sample_verdict"])
|
|
|
|
return verdict
|
|
|
|
|
|
|
|
if "sample_severity" in data:
|
|
|
|
verdict = Summary.to_verdict(data["sample_severity"])
|
|
|
|
return verdict
|
|
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
def parse(self) -> None:
|
|
|
|
""" Convert analysis results to MISP Objects """
|
|
|
|
|
|
|
|
if self.use_misp_object:
|
|
|
|
self.parse_as_misp_object()
|
|
|
|
else:
|
|
|
|
self.parse_as_attributes()
|
|
|
|
|
|
|
|
def parse_as_attributes(self) -> None:
|
|
|
|
"""
|
|
|
|
Parse report as attributes
|
|
|
|
This method is compatible with the implementation provided
|
|
|
|
by Koen Van Impe
|
|
|
|
"""
|
|
|
|
|
|
|
|
for report, permalink in self._reports():
|
|
|
|
if report.is_static_report():
|
|
|
|
continue
|
|
|
|
|
|
|
|
if self.include_analysis_details:
|
|
|
|
for detail in report.details():
|
|
|
|
attr = Attribute(type="text", value=detail)
|
|
|
|
self.attributes.append(attr)
|
|
|
|
|
|
|
|
classifications = report.classifications()
|
|
|
|
if classifications:
|
|
|
|
attr = Attribute(type="text", value=classifications)
|
|
|
|
self.attributes.append(attr)
|
|
|
|
|
|
|
|
if self.include_vti_details:
|
|
|
|
for vti in report.vtis():
|
|
|
|
attr = Attribute(type="text", value=vti.operation)
|
|
|
|
self.attributes.append(attr)
|
|
|
|
|
|
|
|
for artifact in report.artifacts():
|
|
|
|
if self.include_all_artifacts or (
|
|
|
|
self.include_iocs and artifact.is_ioc
|
|
|
|
):
|
|
|
|
for attr in artifact.to_attributes():
|
|
|
|
self.attributes.append(attr)
|
|
|
|
|
|
|
|
if self.include_analysis_id and permalink:
|
|
|
|
attr = Attribute(type="link", value=permalink)
|
|
|
|
self.attributes.append(attr)
|
|
|
|
|
|
|
|
def parse_as_misp_object(self):
|
|
|
|
mitre_attacks = []
|
|
|
|
vtis = []
|
|
|
|
artifacts = []
|
|
|
|
|
|
|
|
# add sandbox signature
|
|
|
|
sb_sig = MISPObject(name="sb-signature")
|
|
|
|
sb_sig.add_attribute("software", "VMRay Platform")
|
|
|
|
|
|
|
|
for report, permalink in self._reports():
|
|
|
|
if report.is_static_report():
|
|
|
|
continue
|
|
|
|
|
|
|
|
# create sandbox object
|
|
|
|
obj = MISPObject(name="sandbox-report")
|
|
|
|
obj.add_attribute("on-premise-sandbox", "vmray")
|
|
|
|
|
|
|
|
if permalink:
|
|
|
|
obj.add_attribute("permalink", permalink)
|
|
|
|
|
|
|
|
if self.include_report and self.report:
|
|
|
|
report_data = base64.b64encode(
|
|
|
|
json.dumps(self.report, indent=2).encode("utf-8")
|
|
|
|
).decode("utf-8")
|
|
|
|
obj.add_attribute(
|
|
|
|
"sandbox-file", value=self.report_name, data=report_data
|
|
|
|
)
|
|
|
|
|
|
|
|
score = report.score()
|
|
|
|
attr_score = obj.add_attribute("score", score)
|
|
|
|
|
|
|
|
if self.tag_objects:
|
|
|
|
attr_score.add_tag(f'vmray:verdict="{score}"')
|
|
|
|
|
|
|
|
sandbox_type = report.sandbox_type()
|
|
|
|
obj.add_attribute("sandbox-type", sandbox_type)
|
|
|
|
|
|
|
|
classifications = report.classifications()
|
|
|
|
if classifications:
|
|
|
|
obj.add_attribute("results", classifications)
|
|
|
|
|
|
|
|
self.event.add_object(obj)
|
|
|
|
|
|
|
|
if self.include_vti_details:
|
|
|
|
for vti in report.vtis():
|
|
|
|
if vti not in vtis:
|
|
|
|
vtis.append(vti)
|
|
|
|
|
|
|
|
for artifact in report.artifacts():
|
|
|
|
if self.include_all_artifacts or (
|
|
|
|
self.include_iocs and artifact.is_ioc
|
|
|
|
):
|
|
|
|
if artifact not in artifacts:
|
|
|
|
artifacts.append(artifact)
|
|
|
|
else:
|
|
|
|
idx = artifacts.index(artifact)
|
|
|
|
dup = artifacts[idx]
|
|
|
|
dup.merge(artifact)
|
|
|
|
|
|
|
|
for mitre_attack in report.mitre_attacks():
|
|
|
|
if mitre_attack not in mitre_attacks:
|
|
|
|
mitre_attacks.append(mitre_attack)
|
|
|
|
|
|
|
|
# process VTI's
|
|
|
|
for vti in vtis:
|
|
|
|
vti_text = f"{vti.category}: {vti.operation}. {vti.technique}"
|
|
|
|
vti_attr = sb_sig.add_attribute("signature", value=vti_text)
|
|
|
|
|
|
|
|
if self.tag_objects:
|
|
|
|
value = self._analysis_score_to_taxonomies(vti.score)
|
|
|
|
if value:
|
|
|
|
vti_attr.add_tag(f'vmray:vti_analysis_score="{value}"')
|
|
|
|
|
|
|
|
self.event.add_object(sb_sig)
|
|
|
|
|
|
|
|
# process artifacts
|
|
|
|
for artifact in artifacts:
|
|
|
|
artifact_obj = artifact.to_misp_object(self.tag_objects)
|
|
|
|
self.event.add_object(artifact_obj)
|
|
|
|
|
|
|
|
# tag event with Mitre Att&ck
|
|
|
|
for mitre_attack in mitre_attacks:
|
|
|
|
self.event.add_tag(mitre_attack.to_misp_galaxy())
|
|
|
|
|
|
|
|
# tag event
|
|
|
|
if self.tag_objects:
|
|
|
|
verdict = self._get_sample_verdict()
|
|
|
|
if verdict:
|
|
|
|
self.event.add_tag(f'vmray:verdict="{verdict}"')
|
|
|
|
|
|
|
|
def to_json(self) -> Dict[str, Any]:
|
|
|
|
""" Convert parsed results into JSON """
|
|
|
|
|
|
|
|
if not self.use_misp_object:
|
|
|
|
results = []
|
|
|
|
|
|
|
|
# remove duplicates
|
|
|
|
for attribute in self.attributes:
|
|
|
|
if attribute not in results:
|
|
|
|
results.append(asdict(attribute))
|
|
|
|
|
|
|
|
# add attributes to event
|
|
|
|
for attribute in results:
|
|
|
|
self.event.add_attribute(**attribute)
|
|
|
|
|
|
|
|
self.event.run_expansions()
|
|
|
|
event = json.loads(self.event.to_json())
|
|
|
|
|
|
|
|
return {"results": event}
|