mirror of https://github.com/MISP/misp-modules
				
				
				
			
		
			
				
	
	
		
			1412 lines
		
	
	
		
			45 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			1412 lines
		
	
	
		
			45 KiB
		
	
	
	
		
			Python
		
	
	
| import base64
 | |
| import json
 | |
| import re
 | |
| 
 | |
| from abc import ABC, abstractmethod
 | |
| from dataclasses import asdict, dataclass, field
 | |
| from enum import Enum
 | |
| from pathlib import PureWindowsPath
 | |
| from typing import Any, Dict, Iterator, List, Optional, Tuple, Union
 | |
| 
 | |
| from pymisp import MISPAttribute, MISPEvent, MISPObject
 | |
| 
 | |
| from .rest_api import VMRayRESTAPI, VMRayRESTAPIError
 | |
| 
 | |
| 
 | |
| USER_RE = re.compile(r".:.Users\\(.*?)\\", re.IGNORECASE)
 | |
| DOC_RE = re.compile(r".:.DOCUME~1.\\(.*?)\\", re.IGNORECASE)
 | |
| DOC_AND_SETTINGS_RE = re.compile(r".:.Documents and Settings\\(.*?)\\", re.IGNORECASE)
 | |
| USERPROFILES = [USER_RE, DOC_RE, DOC_AND_SETTINGS_RE]
 | |
| 
 | |
| 
 | |
| def classifications_to_str(classifications: List[str]) -> Optional[str]:
 | |
|     if classifications:
 | |
|         return "Classifications: " + ", ".join(classifications)
 | |
|     return None
 | |
| 
 | |
| 
 | |
| def merge_lists(target: List[Any], source: List[Any]):
 | |
|     return list({*target, *source})
 | |
| 
 | |
| 
 | |
| @dataclass
 | |
| class Attribute:
 | |
|     type: str
 | |
|     value: str
 | |
|     category: Optional[str] = None
 | |
|     comment: Optional[str] = None
 | |
|     to_ids: bool = False
 | |
| 
 | |
|     def __eq__(self, other: Dict[str, Any]) -> bool:
 | |
|         return asdict(self) == other
 | |
| 
 | |
| 
 | |
| @dataclass
 | |
| class Artifact:
 | |
|     is_ioc: bool
 | |
|     verdict: Optional[str]
 | |
| 
 | |
|     @abstractmethod
 | |
|     def to_attributes(self) -> Iterator[Attribute]:
 | |
|         raise NotImplementedError()
 | |
| 
 | |
|     @abstractmethod
 | |
|     def to_misp_object(self, tag: bool) -> MISPObject:
 | |
|         raise NotImplementedError()
 | |
| 
 | |
|     @abstractmethod
 | |
|     def merge(self, other: "Artifact") -> None:
 | |
|         raise NotImplementedError()
 | |
| 
 | |
|     @abstractmethod
 | |
|     def __eq__(self, other: "Artifact") -> bool:
 | |
|         raise NotImplementedError()
 | |
| 
 | |
|     def tag_artifact_attribute(self, attribute: MISPAttribute) -> None:
 | |
|         if self.is_ioc:
 | |
|             attribute.add_tag('vmray:artifact="IOC"')
 | |
| 
 | |
|         if self.verdict:
 | |
|             attribute.add_tag(f'vmray:verdict="{self.verdict}"')
 | |
| 
 | |
| 
 | |
| @dataclass
 | |
| class DomainArtifact(Artifact):
 | |
|     domain: str
 | |
|     sources: List[str]
 | |
|     ips: List[str] = field(default_factory=list)
 | |
|     classifications: List[str] = field(default_factory=list)
 | |
| 
 | |
|     def to_attributes(self) -> Iterator[Attribute]:
 | |
|         value = self.domain
 | |
|         comment = ", ".join(self.sources) if self.sources else None
 | |
| 
 | |
|         attr = Attribute(type="domain", value=value, comment=comment)
 | |
|         yield attr
 | |
| 
 | |
|     def to_misp_object(self, tag: bool) -> MISPObject:
 | |
|         obj = MISPObject(name="domain-ip")
 | |
| 
 | |
|         classifications = classifications_to_str(self.classifications)
 | |
|         attr = obj.add_attribute(
 | |
|             "domain", value=self.domain, to_ids=self.is_ioc, comment=classifications
 | |
|         )
 | |
|         if tag:
 | |
|             self.tag_artifact_attribute(attr)
 | |
| 
 | |
|         for ip in self.ips:
 | |
|             obj.add_attribute("ip", value=ip, to_ids=self.is_ioc)
 | |
| 
 | |
|         return obj
 | |
| 
 | |
|     def merge(self, other: Artifact) -> None:
 | |
|         if not isinstance(other, DomainArtifact):
 | |
|             return
 | |
| 
 | |
|         self.ips = merge_lists(self.ips, other.ips)
 | |
|         self.classifications = merge_lists(self.classifications, other.classifications)
 | |
| 
 | |
|     def __eq__(self, other: Artifact) -> bool:
 | |
|         if not isinstance(other, DomainArtifact):
 | |
|             return NotImplemented
 | |
| 
 | |
|         return self.domain == other.domain
 | |
| 
 | |
| 
 | |
| @dataclass
 | |
| class EmailArtifact(Artifact):
 | |
|     sender: Optional[str]
 | |
|     subject: Optional[str]
 | |
|     recipients: List[str] = field(default_factory=list)
 | |
|     classifications: List[str] = field(default_factory=list)
 | |
| 
 | |
|     def to_attributes(self) -> Iterator[Attribute]:
 | |
|         if self.sender:
 | |
|             classifications = classifications_to_str(self.classifications)
 | |
|             yield Attribute(
 | |
|                 type="email-src", value=self.sender, comment=classifications
 | |
|             )
 | |
| 
 | |
|         if self.subject:
 | |
|             yield Attribute(type="email-subject", value=self.subject, to_ids=False)
 | |
| 
 | |
|         for recipient in self.recipients:
 | |
|             yield Attribute(type="email-dst", value=recipient, to_ids=False)
 | |
| 
 | |
|     def to_misp_object(self, tag: bool) -> MISPObject:
 | |
|         obj = MISPObject(name="email")
 | |
| 
 | |
|         if self.sender:
 | |
|             classifications = classifications_to_str(self.classifications)
 | |
|             attr = obj.add_attribute(
 | |
|                 "from", value=self.sender, to_ids=self.is_ioc, comment=classifications
 | |
|             )
 | |
|             if tag:
 | |
|                 self.tag_artifact_attribute(attr)
 | |
| 
 | |
|         if self.subject:
 | |
|             obj.add_attribute("subject", value=self.subject, to_ids=False)
 | |
| 
 | |
|         for recipient in self.recipients:
 | |
|             obj.add_attribute("to", value=recipient, to_ids=False)
 | |
| 
 | |
|         return obj
 | |
| 
 | |
|     def merge(self, other: Artifact) -> None:
 | |
|         if not isinstance(other, EmailArtifact):
 | |
|             return
 | |
| 
 | |
|         self.recipients = merge_lists(self.recipients, other.recipients)
 | |
|         self.classifications = merge_lists(self.classifications, other.classifications)
 | |
| 
 | |
|     def __eq__(self, other: Artifact) -> bool:
 | |
|         if not isinstance(other, EmailArtifact):
 | |
|             return NotImplemented
 | |
| 
 | |
|         return self.sender == other.sender and self.subject == other.subject
 | |
| 
 | |
| 
 | |
| @dataclass
 | |
| class FileArtifact(Artifact):
 | |
|     filenames: List[str]
 | |
|     operations: List[str]
 | |
|     md5: str
 | |
|     sha1: str
 | |
|     sha256: str
 | |
|     ssdeep: str
 | |
|     imphash: Optional[str]
 | |
|     classifications: List[str]
 | |
|     size: Optional[int]
 | |
|     mimetype: Optional[str] = None
 | |
| 
 | |
|     def to_attributes(self) -> Iterator[Attribute]:
 | |
|         operations = ", ".join(self.operations)
 | |
|         comment = f"File operations: {operations}"
 | |
| 
 | |
|         for filename in self.filenames:
 | |
|             attr = Attribute(type="filename", value=filename, comment=comment)
 | |
|             yield attr
 | |
| 
 | |
|         for hash_type in ("md5", "sha1", "sha256", "ssdeep", "imphash"):
 | |
|             for filename in self.filenames:
 | |
|                 value = getattr(self, hash_type)
 | |
|                 if value is not None:
 | |
|                     attr = Attribute(
 | |
|                         type=f"filename|{hash_type}",
 | |
|                         value=f"{filename}|{value}",
 | |
|                         category="Payload delivery",
 | |
|                         to_ids=True,
 | |
|                     )
 | |
|                     yield attr
 | |
| 
 | |
|     def to_misp_object(self, tag: bool) -> MISPObject:
 | |
|         obj = MISPObject(name="file")
 | |
| 
 | |
|         if self.size:
 | |
|             obj.add_attribute("size-in-bytes", value=self.size)
 | |
| 
 | |
|         classifications = classifications_to_str(self.classifications)
 | |
|         hashes = [
 | |
|             ("md5", self.md5),
 | |
|             ("sha1", self.sha1),
 | |
|             ("sha256", self.sha256),
 | |
|             ("ssdeep", self.ssdeep),
 | |
|         ]
 | |
|         for (key, value) in hashes:
 | |
|             if not value:
 | |
|                 continue
 | |
| 
 | |
|             attr = obj.add_attribute(
 | |
|                 key, value=value, to_ids=self.is_ioc, comment=classifications
 | |
|             )
 | |
| 
 | |
|             if tag:
 | |
|                 self.tag_artifact_attribute(attr)
 | |
| 
 | |
|         if self.mimetype:
 | |
|             obj.add_attribute("mimetype", value=self.mimetype, to_ids=False)
 | |
| 
 | |
|         operations = None
 | |
|         if self.operations:
 | |
|             operations = "Operations: " + ", ".join(self.operations)
 | |
| 
 | |
|         for filename in self.filenames:
 | |
|             filename = PureWindowsPath(filename)
 | |
|             obj.add_attribute("filename", value=filename.name, comment=operations)
 | |
| 
 | |
|             fullpath = str(filename)
 | |
|             for regex in USERPROFILES:
 | |
|                 fullpath = regex.sub(r"%USERPROFILE%\\", fullpath)
 | |
| 
 | |
|             obj.add_attribute("fullpath", fullpath)
 | |
| 
 | |
|         return obj
 | |
| 
 | |
|     def merge(self, other: Artifact) -> None:
 | |
|         if not isinstance(other, FileArtifact):
 | |
|             return
 | |
| 
 | |
|         self.filenames = merge_lists(self.filenames, other.filenames)
 | |
|         self.operations = merge_lists(self.operations, other.operations)
 | |
|         self.classifications = merge_lists(self.classifications, other.classifications)
 | |
| 
 | |
|     def __eq__(self, other: Artifact) -> bool:
 | |
|         if not isinstance(other, FileArtifact):
 | |
|             return NotImplemented
 | |
| 
 | |
|         return self.sha256 == other.sha256
 | |
| 
 | |
| 
 | |
| @dataclass
 | |
| class IpArtifact(Artifact):
 | |
|     ip: str
 | |
|     sources: List[str]
 | |
|     classifications: List[str] = field(default_factory=list)
 | |
| 
 | |
|     def to_attributes(self) -> Iterator[Attribute]:
 | |
|         sources = ", ".join(self.sources)
 | |
|         comment = f"Found in: {sources}"
 | |
| 
 | |
|         attr = Attribute(type="ip-dst", value=self.ip, comment=comment)
 | |
|         yield attr
 | |
| 
 | |
|     def to_misp_object(self, tag: bool) -> MISPObject:
 | |
|         obj = MISPObject(name="ip-port")
 | |
| 
 | |
|         classifications = classifications_to_str(self.classifications)
 | |
|         attr = obj.add_attribute(
 | |
|             "ip", value=self.ip, comment=classifications, to_ids=self.is_ioc
 | |
|         )
 | |
|         if tag:
 | |
|             self.tag_artifact_attribute(attr)
 | |
| 
 | |
|         return obj
 | |
| 
 | |
|     def merge(self, other: Artifact) -> None:
 | |
|         if not isinstance(other, IpArtifact):
 | |
|             return
 | |
| 
 | |
|         self.sources = merge_lists(self.sources, other.sources)
 | |
|         self.classifications = merge_lists(self.classifications, other.classifications)
 | |
| 
 | |
|     def __eq__(self, other: Artifact) -> bool:
 | |
|         if not isinstance(other, IpArtifact):
 | |
|             return NotImplemented
 | |
| 
 | |
|         return self.ip == other.ip
 | |
| 
 | |
| 
 | |
| @dataclass
 | |
| class MutexArtifact(Artifact):
 | |
|     name: str
 | |
|     operations: List[str]
 | |
|     classifications: List[str] = field(default_factory=list)
 | |
| 
 | |
|     def to_attributes(self) -> Iterator[Attribute]:
 | |
|         operations = ", ".join(self.operations)
 | |
|         comment = f"Operations: {operations}"
 | |
| 
 | |
|         attr = Attribute(type="mutex", value=self.name, comment=comment)
 | |
|         yield attr
 | |
| 
 | |
|     def to_misp_object(self, tag: bool) -> MISPObject:
 | |
|         obj = MISPObject(name="mutex")
 | |
| 
 | |
|         classifications = classifications_to_str(self.classifications)
 | |
|         attr = obj.add_attribute(
 | |
|             "name",
 | |
|             value=self.name,
 | |
|             category="External analysis",
 | |
|             to_ids=False,
 | |
|             comment=classifications,
 | |
|         )
 | |
|         if tag:
 | |
|             self.tag_artifact_attribute(attr)
 | |
| 
 | |
|         operations = None
 | |
|         if self.operations:
 | |
|             operations = "Operations: " + ", ".join(self.operations)
 | |
|         obj.add_attribute("description", value=operations, to_ids=False)
 | |
| 
 | |
|         return obj
 | |
| 
 | |
|     def merge(self, other: Artifact) -> None:
 | |
|         if not isinstance(other, MutexArtifact):
 | |
|             return
 | |
| 
 | |
|         self.operations = merge_lists(self.operations, other.operations)
 | |
|         self.classifications = merge_lists(self.classifications, other.classifications)
 | |
| 
 | |
|     def __eq__(self, other: Artifact) -> bool:
 | |
|         if not isinstance(other, MutexArtifact):
 | |
|             return NotImplemented
 | |
| 
 | |
|         return self.name == other.name
 | |
| 
 | |
| 
 | |
| @dataclass
 | |
| class ProcessArtifact(Artifact):
 | |
|     filename: str
 | |
|     pid: Optional[int] = None
 | |
|     parent_pid: Optional[int] = None
 | |
|     cmd_line: Optional[str] = None
 | |
|     operations: List[str] = field(default_factory=list)
 | |
|     classifications: List[str] = field(default_factory=list)
 | |
| 
 | |
|     def to_attributes(self) -> Iterator[Attribute]:
 | |
|         process_desc = f"Process created: {self.filename}\nPID: {self.pid}"
 | |
|         classifications = classifications_to_str(self.classifications)
 | |
|         yield Attribute(type="text", value=process_desc, comment=classifications)
 | |
| 
 | |
|     def to_misp_object(self, tag: bool) -> MISPObject:
 | |
|         obj = MISPObject(name="process")
 | |
| 
 | |
|         if self.pid:
 | |
|             obj.add_attribute("pid", value=self.pid, category="External analysis")
 | |
| 
 | |
|         if self.parent_pid:
 | |
|             obj.add_attribute(
 | |
|                 "parent-pid", value=self.parent_pid, category="External analysis"
 | |
|             )
 | |
| 
 | |
|         classifications = classifications_to_str(self.classifications)
 | |
|         name_attr = obj.add_attribute(
 | |
|             "name", self.filename, category="External analysis", comment=classifications
 | |
|         )
 | |
| 
 | |
|         cmd_attr = obj.add_attribute("command-line", value=self.cmd_line)
 | |
| 
 | |
|         if tag:
 | |
|             self.tag_artifact_attribute(name_attr)
 | |
|             self.tag_artifact_attribute(cmd_attr)
 | |
| 
 | |
|         return obj
 | |
| 
 | |
|     def merge(self, other: Artifact) -> None:
 | |
|         if not isinstance(other, ProcessArtifact):
 | |
|             return
 | |
| 
 | |
|         self.operations = merge_lists(self.operations, other.operations)
 | |
|         self.classifications = merge_lists(self.classifications, other.classifications)
 | |
| 
 | |
|     def __eq__(self, other: Artifact) -> bool:
 | |
|         if not isinstance(other, ProcessArtifact):
 | |
|             return NotImplemented
 | |
| 
 | |
|         return self.filename == other.filename and self.cmd_line == other.cmd_line
 | |
| 
 | |
| 
 | |
| @dataclass
 | |
| class RegistryArtifact(Artifact):
 | |
|     key: str
 | |
|     operations: List[str]
 | |
| 
 | |
|     def to_attributes(self) -> Iterator[Attribute]:
 | |
|         operations = ", ".join(self.operations)
 | |
|         comment = f"Operations: {operations}"
 | |
| 
 | |
|         attr = Attribute(type="regkey", value=self.key, comment=comment)
 | |
|         yield attr
 | |
| 
 | |
|     def to_misp_object(self, tag: bool) -> MISPObject:
 | |
|         obj = MISPObject(name="registry-key")
 | |
| 
 | |
|         operations = None
 | |
|         if self.operations:
 | |
|             operations = "Operations: " + ", ".join(self.operations)
 | |
| 
 | |
|         attr = obj.add_attribute(
 | |
|             "key", value=self.key, to_ids=self.is_ioc, comment=operations
 | |
|         )
 | |
|         if tag:
 | |
|             self.tag_artifact_attribute(attr)
 | |
| 
 | |
|         return obj
 | |
| 
 | |
|     def merge(self, other: Artifact) -> None:
 | |
|         if not isinstance(other, RegistryArtifact):
 | |
|             return
 | |
| 
 | |
|         self.operations = merge_lists(self.operations, other.operations)
 | |
| 
 | |
|     def __eq__(self, other: Artifact) -> bool:
 | |
|         if not isinstance(other, RegistryArtifact):
 | |
|             return NotImplemented
 | |
| 
 | |
|         return self.key == other.key
 | |
| 
 | |
| 
 | |
| @dataclass
 | |
| class UrlArtifact(Artifact):
 | |
|     url: str
 | |
|     operations: List[str]
 | |
|     domain: Optional[str] = None
 | |
|     ips: List[str] = field(default_factory=list)
 | |
| 
 | |
|     def to_attributes(self) -> Iterator[Attribute]:
 | |
|         operations = ", ".join(self.operations)
 | |
|         comment = f"Operations: {operations}"
 | |
| 
 | |
|         attr = Attribute(type="url", value=self.url, comment=comment)
 | |
|         yield attr
 | |
| 
 | |
|     def to_misp_object(self, tag: bool) -> MISPObject:
 | |
|         obj = MISPObject(name="url")
 | |
| 
 | |
|         operations = None
 | |
|         if self.operations:
 | |
|             operations = "Operations: " + ", ".join(self.operations)
 | |
| 
 | |
|         attr = obj.add_attribute(
 | |
|             "url",
 | |
|             value=self.url,
 | |
|             comment=operations,
 | |
|             category="External analysis",
 | |
|             to_ids=False,
 | |
|         )
 | |
|         if tag:
 | |
|             self.tag_artifact_attribute(attr)
 | |
| 
 | |
|         if self.domain:
 | |
|             obj.add_attribute(
 | |
|                 "domain", self.domain, category="External analysis", to_ids=False
 | |
|             )
 | |
| 
 | |
|         for ip in self.ips:
 | |
|             obj.add_attribute("ip", ip, category="External analysis", to_ids=False)
 | |
| 
 | |
|         return obj
 | |
| 
 | |
|     def merge(self, other: Artifact) -> None:
 | |
|         if not isinstance(other, UrlArtifact):
 | |
|             return
 | |
| 
 | |
|         self.ips = merge_lists(self.ips, other.ips)
 | |
|         self.operations = merge_lists(self.operations, other.operations)
 | |
| 
 | |
|     def __eq__(self, other: Artifact) -> bool:
 | |
|         if not isinstance(other, UrlArtifact):
 | |
|             return NotImplemented
 | |
| 
 | |
|         return self.url == other.url and self.domain == other.domain
 | |
| 
 | |
| 
 | |
| @dataclass
 | |
| class MitreAttack:
 | |
|     description: str
 | |
|     id: str
 | |
| 
 | |
|     def to_misp_galaxy(self) -> str:
 | |
|         return f'misp-galaxy:mitre-attack-pattern="{self.description} - {self.id}"'
 | |
| 
 | |
| 
 | |
| @dataclass
 | |
| class VTI:
 | |
|     category: str
 | |
|     operation: str
 | |
|     technique: str
 | |
|     score: int
 | |
| 
 | |
| 
 | |
| class ReportVersion(Enum):
 | |
|     v1 = "v1"
 | |
|     v2 = "v2"
 | |
| 
 | |
| 
 | |
| class VMRayParseError(Exception):
 | |
|     pass
 | |
| 
 | |
| 
 | |
| class ReportParser(ABC):
 | |
|     @abstractmethod
 | |
|     def __init__(self, api: VMRayRESTAPI, analysis_id: int):
 | |
|         raise NotImplementedError()
 | |
| 
 | |
|     @abstractmethod
 | |
|     def is_static_report(self) -> bool:
 | |
|         raise NotImplementedError()
 | |
| 
 | |
|     @abstractmethod
 | |
|     def artifacts(self) -> Iterator[Artifact]:
 | |
|         raise NotImplementedError()
 | |
| 
 | |
|     @abstractmethod
 | |
|     def classifications(self) -> Optional[str]:
 | |
|         raise NotImplementedError()
 | |
| 
 | |
|     @abstractmethod
 | |
|     def details(self) -> Iterator[str]:
 | |
|         raise NotImplementedError()
 | |
| 
 | |
|     @abstractmethod
 | |
|     def mitre_attacks(self) -> Iterator[MitreAttack]:
 | |
|         raise NotImplementedError()
 | |
| 
 | |
|     @abstractmethod
 | |
|     def sandbox_type(self) -> str:
 | |
|         raise NotImplementedError()
 | |
| 
 | |
|     @abstractmethod
 | |
|     def score(self) -> str:
 | |
|         raise NotImplementedError()
 | |
| 
 | |
|     @abstractmethod
 | |
|     def vtis(self) -> Iterator[VTI]:
 | |
|         raise NotImplementedError()
 | |
| 
 | |
| 
 | |
| class Summary(ReportParser):
 | |
|     def __init__(
 | |
|         self, analysis_id: int, api: VMRayRESTAPI = None, report: Dict[str, Any] = None
 | |
|     ):
 | |
|         self.analysis_id = analysis_id
 | |
| 
 | |
|         if report:
 | |
|             self.report = report
 | |
|         else:
 | |
|             data = api.call(
 | |
|                 "GET",
 | |
|                 f"/rest/analysis/{analysis_id}/archive/logs/summary.json",
 | |
|                 raw_data=True,
 | |
|             )
 | |
|             self.report = json.load(data)
 | |
| 
 | |
|     @staticmethod
 | |
|     def to_verdict(score: Union[int, str]) -> Optional[str]:
 | |
|         if isinstance(score, int):
 | |
|             if 0 <= score <= 24:
 | |
|                 return "clean"
 | |
|             if 25 <= score <= 74:
 | |
|                 return "suspicious"
 | |
|             if 75 <= score <= 100:
 | |
|                 return "malicious"
 | |
|             return "n/a"
 | |
|         if isinstance(score, str):
 | |
|             score = score.lower()
 | |
|             if score in ("not_suspicious", "whitelisted"):
 | |
|                 return "clean"
 | |
|             if score == "blacklisted":
 | |
|                 return "malicious"
 | |
|             if score in ("not_available", "unknown"):
 | |
|                 return "n/a"
 | |
|             return score
 | |
|         return None
 | |
| 
 | |
|     def is_static_report(self) -> bool:
 | |
|         return self.report["vti"]["vti_rule_type"] == "Static"
 | |
| 
 | |
|     def artifacts(self) -> Iterator[Artifact]:
 | |
|         artifacts = self.report["artifacts"]
 | |
|         domains = artifacts.get("domains", [])
 | |
|         for domain in domains:
 | |
|             classifications = domain.get("classifications", [])
 | |
|             is_ioc = domain.get("ioc", False)
 | |
|             verdict = self.to_verdict(domain.get("severity"))
 | |
|             ips = domain.get("ip_addresses", [])
 | |
|             artifact = DomainArtifact(
 | |
|                 domain=domain["domain"],
 | |
|                 sources=domain["sources"],
 | |
|                 ips=ips,
 | |
|                 classifications=classifications,
 | |
|                 is_ioc=is_ioc,
 | |
|                 verdict=verdict,
 | |
|             )
 | |
|             yield artifact
 | |
| 
 | |
|         emails = artifacts.get("emails", [])
 | |
|         for email in emails:
 | |
|             sender = email.get("sender")
 | |
|             subject = email.get("subject")
 | |
|             verdict = self.to_verdict(email.get("severity"))
 | |
|             recipients = email.get("recipients", [])
 | |
|             classifications = email.get("classifications", [])
 | |
|             is_ioc = email.get("ioc", False)
 | |
| 
 | |
|             artifact = EmailArtifact(
 | |
|                 sender=sender,
 | |
|                 subject=subject,
 | |
|                 verdict=verdict,
 | |
|                 recipients=recipients,
 | |
|                 classifications=classifications,
 | |
|                 is_ioc=is_ioc,
 | |
|             )
 | |
|             yield artifact
 | |
| 
 | |
|         files = artifacts.get("files", [])
 | |
|         for file_ in files:
 | |
|             if file_["filename"] is None:
 | |
|                 continue
 | |
| 
 | |
|             filenames = [file_["filename"]]
 | |
|             if "filenames" in file_:
 | |
|                 filenames += file_["filenames"]
 | |
| 
 | |
|             hashes = file_["hashes"]
 | |
|             classifications = file_.get("classifications", [])
 | |
|             operations = file_.get("operations", [])
 | |
|             is_ioc = file_.get("ioc", False)
 | |
|             mimetype = file_.get("mime_type")
 | |
|             verdict = self.to_verdict(file_.get("severity"))
 | |
| 
 | |
|             for hash_dict in hashes:
 | |
|                 imp = hash_dict.get("imp_hash")
 | |
| 
 | |
|                 artifact = FileArtifact(
 | |
|                     filenames=filenames,
 | |
|                     imphash=imp,
 | |
|                     md5=hash_dict["md5_hash"],
 | |
|                     ssdeep=hash_dict["ssdeep_hash"],
 | |
|                     sha256=hash_dict["sha256_hash"],
 | |
|                     sha1=hash_dict["sha1_hash"],
 | |
|                     operations=operations,
 | |
|                     classifications=classifications,
 | |
|                     size=file_.get("file_size"),
 | |
|                     is_ioc=is_ioc,
 | |
|                     mimetype=mimetype,
 | |
|                     verdict=verdict,
 | |
|                 )
 | |
|                 yield artifact
 | |
| 
 | |
|         ips = artifacts.get("ips", [])
 | |
|         for ip in ips:
 | |
|             is_ioc = ip.get("ioc", False)
 | |
|             verdict = self.to_verdict(ip.get("severity"))
 | |
|             classifications = ip.get("classifications", [])
 | |
|             artifact = IpArtifact(
 | |
|                 ip=ip["ip_address"],
 | |
|                 sources=ip["sources"],
 | |
|                 classifications=classifications,
 | |
|                 verdict=verdict,
 | |
|                 is_ioc=is_ioc,
 | |
|             )
 | |
|             yield artifact
 | |
| 
 | |
|         mutexes = artifacts.get("mutexes", [])
 | |
|         for mutex in mutexes:
 | |
|             verdict = self.to_verdict(mutex.get("severity"))
 | |
|             is_ioc = mutex.get("ioc", False)
 | |
|             artifact = MutexArtifact(
 | |
|                 name=mutex["mutex_name"],
 | |
|                 operations=mutex["operations"],
 | |
|                 classifications=[],
 | |
|                 verdict=verdict,
 | |
|                 is_ioc=is_ioc,
 | |
|             )
 | |
|             yield artifact
 | |
| 
 | |
|         processes = artifacts.get("processes", [])
 | |
|         for process in processes:
 | |
|             classifications = process.get("classifications", [])
 | |
|             cmd_line = process.get("cmd_line")
 | |
|             name = process["image_name"]
 | |
|             verdict = self.to_verdict(process.get("severity"))
 | |
|             is_ioc = process.get("ioc", False)
 | |
| 
 | |
|             artifact = ProcessArtifact(
 | |
|                 filename=name,
 | |
|                 classifications=classifications,
 | |
|                 cmd_line=cmd_line,
 | |
|                 verdict=verdict,
 | |
|                 is_ioc=is_ioc,
 | |
|             )
 | |
|             yield artifact
 | |
| 
 | |
|         registry = artifacts.get("registry", [])
 | |
|         for reg in registry:
 | |
|             is_ioc = reg.get("ioc", False)
 | |
|             verdict = self.to_verdict(reg.get("severity"))
 | |
|             artifact = RegistryArtifact(
 | |
|                 key=reg["reg_key_name"],
 | |
|                 operations=reg["operations"],
 | |
|                 verdict=verdict,
 | |
|                 is_ioc=is_ioc,
 | |
|             )
 | |
|             yield artifact
 | |
| 
 | |
|         urls = artifacts.get("urls", [])
 | |
|         for url in urls:
 | |
|             ips = url.get("ip_addresses", [])
 | |
|             is_ioc = url.get("ioc", False)
 | |
|             verdict = self.to_verdict(url.get("severity"))
 | |
| 
 | |
|             artifact = UrlArtifact(
 | |
|                 url=url["url"],
 | |
|                 operations=url["operations"],
 | |
|                 ips=ips,
 | |
|                 is_ioc=is_ioc,
 | |
|                 verdict=verdict,
 | |
|             )
 | |
|             yield artifact
 | |
| 
 | |
|     def classifications(self) -> Optional[str]:
 | |
|         classifications = self.report["classifications"]
 | |
|         if classifications:
 | |
|             str_classifications = ", ".join(classifications)
 | |
|             return f"Classifications: {str_classifications}"
 | |
|         return None
 | |
| 
 | |
|     def details(self) -> Iterator[str]:
 | |
|         details = self.report["analysis_details"]
 | |
|         execution_successful = details["execution_successful"]
 | |
|         termination_reason = details["termination_reason"]
 | |
|         result = details["result_str"]
 | |
| 
 | |
|         if self.analysis_id == 0:
 | |
|             analysis = ""
 | |
|         else:
 | |
|             analysis = f" {self.analysis_id}"
 | |
| 
 | |
|         yield f"Analysis{analysis}: execution_successful: {execution_successful}"
 | |
|         yield f"Analysis{analysis}: termination_reason: {termination_reason}"
 | |
|         yield f"Analysis{analysis}: result: {result}"
 | |
| 
 | |
|     def mitre_attacks(self) -> Iterator[MitreAttack]:
 | |
|         mitre_attack = self.report["mitre_attack"]
 | |
|         techniques = mitre_attack.get("techniques", [])
 | |
| 
 | |
|         for technique in techniques:
 | |
|             mitre_attack = MitreAttack(
 | |
|                 description=technique["description"], id=technique["id"]
 | |
|             )
 | |
|             yield mitre_attack
 | |
| 
 | |
|     def sandbox_type(self) -> str:
 | |
|         vm_name = self.report["vm_and_analyzer_details"]["vm_name"]
 | |
|         sample_type = self.report["sample_details"]["sample_type"]
 | |
|         return f"{vm_name} | {sample_type}"
 | |
| 
 | |
|     def score(self) -> str:
 | |
|         vti_score = self.report["vti"]["vti_score"]
 | |
|         return self.to_verdict(vti_score)
 | |
| 
 | |
|     def vtis(self) -> Iterator[VTI]:
 | |
|         try:
 | |
|             vtis = self.report["vti"]["vti_rule_matches"]
 | |
|         except KeyError:
 | |
|             vtis = []
 | |
| 
 | |
|         for vti in vtis:
 | |
|             new_vti = VTI(
 | |
|                 category=vti["category_desc"],
 | |
|                 operation=vti["operation_desc"],
 | |
|                 technique=vti["technique_desc"],
 | |
|                 score=vti["rule_score"],
 | |
|             )
 | |
| 
 | |
|             yield new_vti
 | |
| 
 | |
| 
 | |
| class SummaryV2(ReportParser):
 | |
|     def __init__(
 | |
|         self, analysis_id: int, api: VMRayRESTAPI = None, report: Dict[str, Any] = None
 | |
|     ):
 | |
|         self.analysis_id = analysis_id
 | |
| 
 | |
|         if report:
 | |
|             self.report = report
 | |
|         else:
 | |
|             self.api = api
 | |
|             data = api.call(
 | |
|                 "GET",
 | |
|                 f"/rest/analysis/{analysis_id}/archive/logs/summary_v2.json",
 | |
|                 raw_data=True,
 | |
|             )
 | |
|             self.report = json.load(data)
 | |
| 
 | |
|     def _resolve_refs(
 | |
|         self, data: Union[List[Dict[str, Any]], Dict[str, Any]]
 | |
|     ) -> Iterator[Dict[str, Any]]:
 | |
|         if not data:
 | |
|             return []
 | |
| 
 | |
|         if isinstance(data, dict):
 | |
|             data = [data]
 | |
| 
 | |
|         for ref in data:
 | |
|             yield self._resolve_ref(ref)
 | |
| 
 | |
|     def _resolve_ref(self, data: Dict[str, Any]) -> Dict[str, Any]:
 | |
|         if data == {}:
 | |
|             return {}
 | |
| 
 | |
|         if data["_type"] != "reference" or data["source"] != "logs/summary_v2.json":
 | |
|             return {}
 | |
| 
 | |
|         resolved_ref = self.report
 | |
|         paths = data["path"]
 | |
|         for path_part in paths:
 | |
|             try:
 | |
|                 resolved_ref = resolved_ref[path_part]
 | |
|             except KeyError:
 | |
|                 return {}
 | |
| 
 | |
|         return resolved_ref
 | |
| 
 | |
|     @staticmethod
 | |
|     def convert_verdict(verdict: Optional[str]) -> str:
 | |
|         if verdict == "not_available" or not verdict:
 | |
|             return "n/a"
 | |
| 
 | |
|         return verdict
 | |
| 
 | |
|     def is_static_report(self) -> bool:
 | |
|         return self.report["vti"]["score_type"] == "static"
 | |
| 
 | |
|     def artifacts(self) -> Iterator[Artifact]:
 | |
|         artifacts = self.report["artifacts"]
 | |
| 
 | |
|         ref_domains = artifacts.get("ref_domains", [])
 | |
|         for domain in self._resolve_refs(ref_domains):
 | |
|             classifications = domain.get("classifications", [])
 | |
|             artifact = DomainArtifact(
 | |
|                 domain=domain["domain"],
 | |
|                 sources=domain["sources"],
 | |
|                 classifications=classifications,
 | |
|                 is_ioc=domain["is_ioc"],
 | |
|                 verdict=domain["verdict"],
 | |
|             )
 | |
| 
 | |
|             ref_ip_addresses = domain.get("ref_ip_addresses", [])
 | |
|             if not ref_ip_addresses:
 | |
|                 continue
 | |
| 
 | |
|             for ip_address in self._resolve_refs(ref_ip_addresses):
 | |
|                 artifact.ips.append(ip_address["ip_address"])
 | |
| 
 | |
|             yield artifact
 | |
| 
 | |
|         ref_emails = artifacts.get("ref_emails", [])
 | |
|         for email in self._resolve_refs(ref_emails):
 | |
|             sender = email.get("sender")
 | |
|             subject = email.get("subject")
 | |
|             recipients = email.get("recipients", [])
 | |
|             verdict = email["verdict"]
 | |
|             is_ioc = email["is_ioc"]
 | |
|             classifications = email.get("classifications", [])
 | |
| 
 | |
|             artifact = EmailArtifact(
 | |
|                 sender=sender,
 | |
|                 subject=subject,
 | |
|                 recipients=recipients,
 | |
|                 classifications=classifications,
 | |
|                 verdict=verdict,
 | |
|                 is_ioc=is_ioc,
 | |
|             )
 | |
| 
 | |
|             yield artifact
 | |
| 
 | |
|         ref_files = artifacts.get("ref_files", [])
 | |
|         for file_ in self._resolve_refs(ref_files):
 | |
|             filenames = []
 | |
| 
 | |
|             if "ref_filenames" in file_:
 | |
|                 for filename in self._resolve_refs(file_["ref_filenames"]):
 | |
|                     if not filename:
 | |
|                         continue
 | |
|                     filenames.append(filename["filename"])
 | |
| 
 | |
|             artifact = FileArtifact(
 | |
|                 operations=file_.get("operations", []),
 | |
|                 md5=file_["hash_values"]["md5"],
 | |
|                 sha1=file_["hash_values"]["sha1"],
 | |
|                 sha256=file_["hash_values"]["sha256"],
 | |
|                 ssdeep=file_["hash_values"]["ssdeep"],
 | |
|                 imphash=None,
 | |
|                 mimetype=file_.get("mime_type"),
 | |
|                 filenames=filenames,
 | |
|                 is_ioc=file_["is_ioc"],
 | |
|                 classifications=file_.get("classifications", []),
 | |
|                 size=file_["size"],
 | |
|                 verdict=file_["verdict"],
 | |
|             )
 | |
|             yield artifact
 | |
| 
 | |
|         ref_ip_addresses = artifacts.get("ref_ip_addresses", [])
 | |
|         for ip in self._resolve_refs(ref_ip_addresses):
 | |
|             classifications = ip.get("classifications", [])
 | |
|             verdict = ip["verdict"]
 | |
|             is_ioc = ip["is_ioc"]
 | |
|             artifact = IpArtifact(
 | |
|                 ip=ip["ip_address"],
 | |
|                 sources=ip["sources"],
 | |
|                 classifications=classifications,
 | |
|                 verdict=verdict,
 | |
|                 is_ioc=is_ioc,
 | |
|             )
 | |
|             yield artifact
 | |
| 
 | |
|         ref_mutexes = artifacts.get("ref_mutexes", [])
 | |
|         for mutex in self._resolve_refs(ref_mutexes):
 | |
|             is_ioc = mutex["is_ioc"]
 | |
|             classifications = mutex.get("classifications", [])
 | |
|             artifact = MutexArtifact(
 | |
|                 name=mutex["name"],
 | |
|                 operations=mutex["operations"],
 | |
|                 verdict=mutex["verdict"],
 | |
|                 classifications=classifications,
 | |
|                 is_ioc=is_ioc,
 | |
|             )
 | |
|             yield artifact
 | |
| 
 | |
|         ref_processes = artifacts.get("ref_processes", [])
 | |
|         for process in self._resolve_refs(ref_processes):
 | |
|             cmd_line = process.get("cmd_line")
 | |
|             classifications = process.get("classifications", [])
 | |
|             verdict = process.get("verdict")
 | |
|             artifact = ProcessArtifact(
 | |
|                 pid=process["os_pid"],
 | |
|                 parent_pid=process["origin_monitor_id"],
 | |
|                 filename=process["filename"],
 | |
|                 is_ioc=process["is_ioc"],
 | |
|                 cmd_line=cmd_line,
 | |
|                 classifications=classifications,
 | |
|                 verdict=verdict,
 | |
|             )
 | |
|             yield artifact
 | |
| 
 | |
|         ref_registry_records = artifacts.get("ref_registry_records", [])
 | |
|         for reg in self._resolve_refs(ref_registry_records):
 | |
|             artifact = RegistryArtifact(
 | |
|                 key=reg["reg_key_name"],
 | |
|                 operations=reg["operations"],
 | |
|                 is_ioc=reg["is_ioc"],
 | |
|                 verdict=reg["verdict"],
 | |
|             )
 | |
|             yield artifact
 | |
| 
 | |
|         url_refs = artifacts.get("ref_urls", [])
 | |
|         for url in self._resolve_refs(url_refs):
 | |
|             domain = None
 | |
|             ref_domain = url.get("ref_domain", {})
 | |
|             if ref_domain:
 | |
|                 domain = self._resolve_ref(ref_domain)["domain"]
 | |
| 
 | |
|             ips = []
 | |
|             ref_ip_addresses = url.get("ref_ip_addresses", [])
 | |
|             for ip_address in self._resolve_refs(ref_ip_addresses):
 | |
|                 ips.append(ip_address["ip_address"])
 | |
| 
 | |
|             artifact = UrlArtifact(
 | |
|                 url=url["url"],
 | |
|                 operations=url["operations"],
 | |
|                 is_ioc=url["is_ioc"],
 | |
|                 domain=domain,
 | |
|                 ips=ips,
 | |
|                 verdict=url["verdict"],
 | |
|             )
 | |
|             yield artifact
 | |
| 
 | |
|     def classifications(self) -> Optional[str]:
 | |
|         try:
 | |
|             classifications = ", ".join(self.report["classifications"])
 | |
|             return f"Classifications: {classifications}"
 | |
|         except KeyError:
 | |
|             return None
 | |
| 
 | |
|     def details(self) -> Iterator[str]:
 | |
|         details = self.report["analysis_metadata"]
 | |
|         is_execution_successful = details["is_execution_successful"]
 | |
|         termination_reason = details["termination_reason"]
 | |
|         result = details["result_str"]
 | |
| 
 | |
|         yield f"Analysis {self.analysis_id}: execution_successful: {is_execution_successful}"
 | |
|         yield f"Analysis {self.analysis_id}: termination_reason: {termination_reason}"
 | |
|         yield f"Analysis {self.analysis_id}: result: {result}"
 | |
| 
 | |
|     def mitre_attacks(self) -> Iterator[MitreAttack]:
 | |
|         mitre_attack = self.report["mitre_attack"]
 | |
|         techniques = mitre_attack["v4"]["techniques"]
 | |
| 
 | |
|         for technique_id, technique in techniques.items():
 | |
|             mitre_attack = MitreAttack(
 | |
|                 description=technique["description"],
 | |
|                 id=technique_id.replace("technique_", ""),
 | |
|             )
 | |
|             yield mitre_attack
 | |
| 
 | |
|     def sandbox_type(self) -> str:
 | |
|         vm_information = self.report["virtual_machine"]["description"]
 | |
|         sample_type = self.report["analysis_metadata"]["sample_type"]
 | |
|         return f"{vm_information} | {sample_type}"
 | |
| 
 | |
|     def score(self) -> str:
 | |
|         verdict = self.report["analysis_metadata"]["verdict"]
 | |
|         return self.convert_verdict(verdict)
 | |
| 
 | |
|     def vtis(self) -> Iterator[VTI]:
 | |
|         if "matches" not in self.report["vti"]:
 | |
|             return
 | |
| 
 | |
|         vti_matches = self.report["vti"]["matches"]
 | |
|         for vti in vti_matches.values():
 | |
|             new_vti = VTI(
 | |
|                 category=vti["category_desc"],
 | |
|                 operation=vti["operation_desc"],
 | |
|                 technique=vti["technique_desc"],
 | |
|                 score=vti["analysis_score"],
 | |
|             )
 | |
| 
 | |
|             yield new_vti
 | |
| 
 | |
| 
 | |
| class VMRayParser:
 | |
|     def __init__(self) -> None:
 | |
|         # required for api import
 | |
|         self.api: Optional[VMRayRESTAPI] = None
 | |
|         self.sample_id: Optional[int] = None
 | |
| 
 | |
|         # required for file import
 | |
|         self.report: Optional[Dict[str, Any]] = None
 | |
|         self.report_name: Optional[str] = None
 | |
|         self.include_report = False
 | |
| 
 | |
|         # required by API import and file import
 | |
|         self.report_version = ReportVersion.v2
 | |
| 
 | |
|         self.use_misp_object = True
 | |
|         self.ignore_analysis_finished = False
 | |
|         self.tag_objects = True
 | |
| 
 | |
|         self.include_analysis_id = True
 | |
|         self.include_vti_details = True
 | |
|         self.include_iocs = True
 | |
|         self.include_all_artifacts = False
 | |
|         self.include_analysis_details = True
 | |
| 
 | |
|         # a new event if we use misp objects
 | |
|         self.event = MISPEvent()
 | |
| 
 | |
|         # new attributes if we don't use misp objects
 | |
|         self.attributes: List[Attribute] = []
 | |
| 
 | |
|     def from_api(self, config: Dict[str, Any]) -> None:
 | |
|         url = self._read_config_key(config, "url")
 | |
|         api_key = self._read_config_key(config, "apikey")
 | |
| 
 | |
|         try:
 | |
|             self.sample_id = int(self._read_config_key(config, "Sample ID"))
 | |
|         except ValueError:
 | |
|             raise VMRayParseError("Could not convert sample id to integer.")
 | |
| 
 | |
|         self.api = VMRayRESTAPI(url, api_key, False)
 | |
| 
 | |
|         self.ignore_analysis_finished = self._config_from_string(config.get("ignore_analysis_finished"))
 | |
|         self._setup_optional_config(config)
 | |
|         self.report_version = self._get_report_version()
 | |
| 
 | |
|     def from_base64_string(
 | |
|         self, config: Dict[str, Any], data: str, filename: str
 | |
|     ) -> None:
 | |
|         """ read base64 encoded summary json """
 | |
| 
 | |
|         buffer = base64.b64decode(data)
 | |
|         self.report = json.loads(buffer)
 | |
|         self.report_name = filename
 | |
| 
 | |
|         if "analysis_details" in self.report:
 | |
|             self.report_version = ReportVersion.v1
 | |
|         elif "analysis_metadata" in self.report:
 | |
|             self.report_version = ReportVersion.v2
 | |
|         else:
 | |
|             raise VMRayParseError("Uploaded file is not a summary.json")
 | |
| 
 | |
|         self._setup_optional_config(config)
 | |
|         self.include_report = bool(int(config.get("Attach Report", "0")))
 | |
| 
 | |
|     def _setup_optional_config(self, config: Dict[str, Any]) -> None:
 | |
|         self.include_analysis_id = bool(int(config.get("Analysis ID", "1")))
 | |
|         self.include_vti_details = bool(int(config.get("VTI", "1")))
 | |
|         self.include_iocs = bool(int(config.get("IOCs", "1")))
 | |
|         self.include_all_artifacts = bool(int(config.get("Artifacts", "0")))
 | |
|         self.include_analysis_details = bool(int(config.get("Analysis Details", "1")))
 | |
| 
 | |
|         self.use_misp_object = not self._config_from_string(
 | |
|             config.get("disable_misp_objects")
 | |
|         )
 | |
|         self.tag_objects = not self._config_from_string(config.get("disable_tags"))
 | |
| 
 | |
|     @staticmethod
 | |
|     def _config_from_string(text: Optional[str]) -> bool:
 | |
|         if not text:
 | |
|             return False
 | |
| 
 | |
|         text = text.lower()
 | |
|         return text in ("yes", "true")
 | |
| 
 | |
|     @staticmethod
 | |
|     def _read_config_key(config: Dict[str, Any], key: str) -> str:
 | |
|         try:
 | |
|             value = config[key]
 | |
|             return value
 | |
|         except KeyError:
 | |
|             raise VMRayParseError(f"VMRay config is missing a value for `{key}`.")
 | |
| 
 | |
|     @staticmethod
 | |
|     def _analysis_score_to_taxonomies(analysis_score: int) -> Optional[str]:
 | |
|         mapping = {
 | |
|             -1: "-1",
 | |
|             1: "1/5",
 | |
|             2: "2/5",
 | |
|             3: "3/5",
 | |
|             4: "4/5",
 | |
|             5: "5/5",
 | |
|         }
 | |
| 
 | |
|         try:
 | |
|             return mapping[analysis_score]
 | |
|         except KeyError:
 | |
|             return None
 | |
| 
 | |
|     def _get_report_version(self) -> ReportVersion:
 | |
|         info = self._vmary_api_call("/rest/system_info")
 | |
|         if info["version_major"] >= 4:
 | |
|             return ReportVersion.v2
 | |
| 
 | |
|         # version 3.2 an less do not tag artifacts as ICOs
 | |
|         # so we extract all artifacts
 | |
|         if info["version_major"] == 3 and info["version_minor"] < 3:
 | |
|             self.include_all_artifacts = True
 | |
|         return ReportVersion.v1
 | |
| 
 | |
|     def _vmary_api_call(
 | |
|         self, api_path: str, params: Dict[str, Any] = None, raw_data: bool = False
 | |
|     ) -> Union[Dict[str, Any], bytes]:
 | |
|         try:
 | |
|             return self.api.call("GET", api_path, params=params, raw_data=raw_data)
 | |
|         except (VMRayRESTAPIError, ValueError) as exc:
 | |
|             raise VMRayParseError(str(exc))
 | |
| 
 | |
|     def _get_analysis(self) -> Dict[str, Any]:
 | |
|         return self._vmary_api_call(f"/rest/analysis/sample/{self.sample_id}")
 | |
| 
 | |
|     def _analysis_finished(self) -> bool:
 | |
|         result = self._vmary_api_call(f"/rest/submission/sample/{self.sample_id}")
 | |
| 
 | |
|         all_finished = []
 | |
|         for submission in result:
 | |
|             finished = submission["submission_finished"]
 | |
|             all_finished.append(finished)
 | |
| 
 | |
|         return all(all_finished)
 | |
| 
 | |
|     def _online_reports(self) -> Iterator[Tuple[ReportParser, str]]:
 | |
|         # check if sample id exists
 | |
|         try:
 | |
|             self._vmary_api_call(f"/rest/sample/{self.sample_id}")
 | |
|         except VMRayRESTAPIError:
 | |
|             raise VMRayParseError(
 | |
|                 f"Could not find sample id `{self.sample_id}` on server."
 | |
|             )
 | |
| 
 | |
|         # check if all submission are finished
 | |
|         if not self.ignore_analysis_finished and not self._analysis_finished():
 | |
|             raise VMRayParseError(
 | |
|                 f"Not all analysis for `{self.sample_id}` are finished. "
 | |
|                 "Try it again in a few minutes."
 | |
|             )
 | |
| 
 | |
|         analysis_results = self._get_analysis()
 | |
|         for analysis in analysis_results:
 | |
|             analysis_id = analysis["analysis_id"]
 | |
|             permalink = analysis["analysis_webif_url"]
 | |
| 
 | |
|             # the summary json could not exist, due to a VM error
 | |
|             try:
 | |
|                 if self.report_version == ReportVersion.v1:
 | |
|                     report_parser = Summary(api=self.api, analysis_id=analysis_id)
 | |
|                 else:
 | |
|                     report_parser = SummaryV2(api=self.api, analysis_id=analysis_id)
 | |
|             except VMRayRESTAPIError:
 | |
|                 continue
 | |
| 
 | |
|             yield report_parser, permalink
 | |
| 
 | |
|     def _offline_report(self) -> ReportParser:
 | |
|         if self.report_version == ReportVersion.v1:
 | |
|             analysis_id = 0
 | |
|             return Summary(report=self.report, analysis_id=analysis_id)
 | |
|         else:
 | |
|             analysis_id = self.report["analysis_metadata"]["analysis_id"]
 | |
|             return SummaryV2(report=self.report, analysis_id=analysis_id)
 | |
| 
 | |
|     def _reports(self) -> Iterator[Tuple[ReportParser, Optional[str]]]:
 | |
|         if self.report:
 | |
|             yield self._offline_report(), None
 | |
|         else:
 | |
|             yield from self._online_reports()
 | |
| 
 | |
|     def _get_sample_verdict(self) -> Optional[str]:
 | |
|         if self.report:
 | |
|             if self.report_version == ReportVersion.v2:
 | |
|                 verdict = SummaryV2.convert_verdict(
 | |
|                     self.report["analysis_metadata"]["verdict"]
 | |
|                 )
 | |
|                 return verdict
 | |
|             return None
 | |
| 
 | |
|         data = self._vmary_api_call(f"/rest/sample/{self.sample_id}")
 | |
|         if "sample_verdict" in data:
 | |
|             verdict = SummaryV2.convert_verdict(data["sample_verdict"])
 | |
|             return verdict
 | |
| 
 | |
|         if "sample_severity" in data:
 | |
|             verdict = Summary.to_verdict(data["sample_severity"])
 | |
|             return verdict
 | |
| 
 | |
|         return None
 | |
| 
 | |
|     def parse(self) -> None:
 | |
|         """ Convert analysis results to MISP Objects """
 | |
| 
 | |
|         if self.use_misp_object:
 | |
|             self.parse_as_misp_object()
 | |
|         else:
 | |
|             self.parse_as_attributes()
 | |
| 
 | |
|     def parse_as_attributes(self) -> None:
 | |
|         """
 | |
|         Parse report as attributes
 | |
|         This method is compatible with the implementation provided
 | |
|         by Koen Van Impe
 | |
|         """
 | |
| 
 | |
|         for report, permalink in self._reports():
 | |
|             if report.is_static_report():
 | |
|                 continue
 | |
| 
 | |
|             if self.include_analysis_details:
 | |
|                 for detail in report.details():
 | |
|                     attr = Attribute(type="text", value=detail)
 | |
|                     self.attributes.append(attr)
 | |
| 
 | |
|             classifications = report.classifications()
 | |
|             if classifications:
 | |
|                 attr = Attribute(type="text", value=classifications)
 | |
|                 self.attributes.append(attr)
 | |
| 
 | |
|             if self.include_vti_details:
 | |
|                 for vti in report.vtis():
 | |
|                     attr = Attribute(type="text", value=vti.operation)
 | |
|                     self.attributes.append(attr)
 | |
| 
 | |
|             for artifact in report.artifacts():
 | |
|                 if self.include_all_artifacts or (
 | |
|                     self.include_iocs and artifact.is_ioc
 | |
|                 ):
 | |
|                     for attr in artifact.to_attributes():
 | |
|                         self.attributes.append(attr)
 | |
| 
 | |
|             if self.include_analysis_id and permalink:
 | |
|                 attr = Attribute(type="link", value=permalink)
 | |
|                 self.attributes.append(attr)
 | |
| 
 | |
|     def parse_as_misp_object(self):
 | |
|         mitre_attacks = []
 | |
|         vtis = []
 | |
|         artifacts = []
 | |
| 
 | |
|         # add sandbox signature
 | |
|         sb_sig = MISPObject(name="sb-signature")
 | |
|         sb_sig.add_attribute("software", "VMRay Platform")
 | |
| 
 | |
|         for report, permalink in self._reports():
 | |
|             if report.is_static_report():
 | |
|                 continue
 | |
| 
 | |
|             # create sandbox object
 | |
|             obj = MISPObject(name="sandbox-report")
 | |
|             obj.add_attribute("on-premise-sandbox", "vmray")
 | |
| 
 | |
|             if permalink:
 | |
|                 obj.add_attribute("permalink", permalink)
 | |
| 
 | |
|             if self.include_report and self.report:
 | |
|                 report_data = base64.b64encode(
 | |
|                     json.dumps(self.report, indent=2).encode("utf-8")
 | |
|                 ).decode("utf-8")
 | |
|                 obj.add_attribute(
 | |
|                     "sandbox-file", value=self.report_name, data=report_data
 | |
|                 )
 | |
| 
 | |
|             score = report.score()
 | |
|             attr_score = obj.add_attribute("score", score)
 | |
| 
 | |
|             if self.tag_objects:
 | |
|                 attr_score.add_tag(f'vmray:verdict="{score}"')
 | |
| 
 | |
|             sandbox_type = report.sandbox_type()
 | |
|             obj.add_attribute("sandbox-type", sandbox_type)
 | |
| 
 | |
|             classifications = report.classifications()
 | |
|             if classifications:
 | |
|                 obj.add_attribute("results", classifications)
 | |
| 
 | |
|             self.event.add_object(obj)
 | |
| 
 | |
|             if self.include_vti_details:
 | |
|                 for vti in report.vtis():
 | |
|                     if vti not in vtis:
 | |
|                         vtis.append(vti)
 | |
| 
 | |
|             for artifact in report.artifacts():
 | |
|                 if self.include_all_artifacts or (
 | |
|                     self.include_iocs and artifact.is_ioc
 | |
|                 ):
 | |
|                     if artifact not in artifacts:
 | |
|                         artifacts.append(artifact)
 | |
|                     else:
 | |
|                         idx = artifacts.index(artifact)
 | |
|                         dup = artifacts[idx]
 | |
|                         dup.merge(artifact)
 | |
| 
 | |
|             for mitre_attack in report.mitre_attacks():
 | |
|                 if mitre_attack not in mitre_attacks:
 | |
|                     mitre_attacks.append(mitre_attack)
 | |
| 
 | |
|         # process VTI's
 | |
|         for vti in vtis:
 | |
|             vti_text = f"{vti.category}: {vti.operation}. {vti.technique}"
 | |
|             vti_attr = sb_sig.add_attribute("signature", value=vti_text)
 | |
| 
 | |
|             if self.tag_objects:
 | |
|                 value = self._analysis_score_to_taxonomies(vti.score)
 | |
|                 if value:
 | |
|                     vti_attr.add_tag(f'vmray:vti_analysis_score="{value}"')
 | |
| 
 | |
|         self.event.add_object(sb_sig)
 | |
| 
 | |
|         # process artifacts
 | |
|         for artifact in artifacts:
 | |
|             artifact_obj = artifact.to_misp_object(self.tag_objects)
 | |
|             self.event.add_object(artifact_obj)
 | |
| 
 | |
|         # tag event with Mitre Att&ck
 | |
|         for mitre_attack in mitre_attacks:
 | |
|             self.event.add_tag(mitre_attack.to_misp_galaxy())
 | |
| 
 | |
|         # tag event
 | |
|         if self.tag_objects:
 | |
|             verdict = self._get_sample_verdict()
 | |
|             if verdict:
 | |
|                 self.event.add_tag(f'vmray:verdict="{verdict}"')
 | |
| 
 | |
|     def to_json(self) -> Dict[str, Any]:
 | |
|         """ Convert parsed results into JSON """
 | |
| 
 | |
|         if not self.use_misp_object:
 | |
|             results = []
 | |
| 
 | |
|             # remove duplicates
 | |
|             for attribute in self.attributes:
 | |
|                 if attribute not in results:
 | |
|                     results.append(asdict(attribute))
 | |
| 
 | |
|             # add attributes to event
 | |
|             for attribute in results:
 | |
|                 self.event.add_attribute(**attribute)
 | |
| 
 | |
|         self.event.run_expansions()
 | |
|         event = json.loads(self.event.to_json())
 | |
| 
 | |
|         return {"results": event}
 |