2016-08-17 18:21:50 +02:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
2016-08-18 00:23:49 +02:00
import unittest
2016-08-17 18:21:50 +02:00
import requests_mock
import json
2016-09-27 19:47:22 +02:00
import os
2016-08-17 18:21:50 +02:00
2016-08-18 13:18:58 +02:00
import pymisp as pm
2016-08-17 18:21:50 +02:00
from pymisp import PyMISP
2016-12-03 17:29:41 +01:00
# from pymisp import NewEventError
2016-09-27 19:47:22 +02:00
from pymisp import MISPEvent
2017-08-25 17:14:33 +02:00
from pymisp import MISPEncode
2017-09-12 16:46:06 +02:00
2017-08-25 17:14:33 +02:00
from pymisp . tools import make_binary_objects
2016-08-17 18:21:50 +02:00
@requests_mock.Mocker ( )
2016-08-18 00:23:49 +02:00
class TestOffline ( unittest . TestCase ) :
2016-08-17 18:21:50 +02:00
def setUp ( self ) :
2016-08-18 00:40:30 +02:00
self . maxDiff = None
2016-08-17 18:21:50 +02:00
self . domain = ' http://misp.local/ '
self . key = ' aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa '
2016-12-02 16:53:45 +01:00
with open ( ' tests/misp_event.json ' , ' r ' ) as f :
self . event = { ' Event ' : json . load ( f ) }
with open ( ' tests/new_misp_event.json ' , ' r ' ) as f :
self . new_misp_event = { ' Event ' : json . load ( f ) }
2017-07-27 17:14:49 +02:00
self . resources_path = os . path . join ( os . path . abspath ( os . path . dirname ( __file__ ) ) , ' ../pymisp/data ' )
with open ( os . path . join ( self . resources_path , ' describeTypes.json ' ) , ' r ' ) as f :
2016-12-02 16:53:45 +01:00
self . types = json . load ( f )
with open ( ' tests/sharing_groups.json ' , ' r ' ) as f :
self . sharing_groups = json . load ( f )
2016-08-19 10:13:00 +02:00
self . auth_error_msg = { " name " : " Authentication failed. Please make sure you pass the API key of an API enabled user along in the Authorization header. " ,
" message " : " Authentication failed. Please make sure you pass the API key of an API enabled user along in the Authorization header. " ,
" url " : " \ /events \ /1 " }
2016-12-02 16:53:45 +01:00
with open ( ' tests/search_index_result.json ' , ' r ' ) as f :
self . search_index_result = json . load ( f )
2016-08-17 18:21:50 +02:00
def initURI ( self , m ) :
2016-08-19 10:13:00 +02:00
m . register_uri ( ' GET ' , self . domain + ' events/1 ' , json = self . auth_error_msg , status_code = 403 )
2017-01-27 13:17:38 +01:00
m . register_uri ( ' GET ' , self . domain + ' servers/getVersion.json ' , json = { " version " : " 2.4.62 " } )
m . register_uri ( ' GET ' , self . domain + ' servers/getPyMISPVersion.json ' , json = { " version " : " 2.4.62 " } )
2017-01-16 20:47:43 +01:00
m . register_uri ( ' GET ' , self . domain + ' sharing_groups.json ' , json = self . sharing_groups )
2016-08-17 18:21:50 +02:00
m . register_uri ( ' GET ' , self . domain + ' attributes/describeTypes.json ' , json = self . types )
m . register_uri ( ' GET ' , self . domain + ' events/2 ' , json = self . event )
2017-01-16 20:47:43 +01:00
m . register_uri ( ' POST ' , self . domain + ' events/5758ebf5-c898-48e6-9fe9-5665c0a83866 ' , json = self . event )
2016-08-18 00:23:49 +02:00
m . register_uri ( ' DELETE ' , self . domain + ' events/2 ' , json = { ' message ' : ' Event deleted. ' } )
m . register_uri ( ' DELETE ' , self . domain + ' events/3 ' , json = { ' errors ' : [ ' Invalid event ' ] , ' message ' : ' Invalid event ' , ' name ' : ' Invalid event ' , ' url ' : ' /events/3 ' } )
2016-08-18 00:40:30 +02:00
m . register_uri ( ' DELETE ' , self . domain + ' attributes/2 ' , json = { ' message ' : ' Attribute deleted. ' } )
2017-04-13 17:02:48 +02:00
m . register_uri ( ' POST ' , self . domain + ' events/index ' , json = self . search_index_result )
2016-08-17 18:21:50 +02:00
def test_getEvent ( self , m ) :
self . initURI ( m )
2016-08-26 18:23:02 +02:00
pymisp = PyMISP ( self . domain , self . key )
2016-08-18 00:23:49 +02:00
e1 = pymisp . get_event ( 2 )
e2 = pymisp . get ( 2 )
self . assertEqual ( e1 , e2 )
self . assertEqual ( self . event , e2 )
def test_updateEvent ( self , m ) :
self . initURI ( m )
2016-08-26 18:23:02 +02:00
pymisp = PyMISP ( self . domain , self . key )
2017-01-16 20:47:43 +01:00
e0 = pymisp . update_event ( ' 5758ebf5-c898-48e6-9fe9-5665c0a83866 ' , json . dumps ( self . event ) )
e1 = pymisp . update_event ( ' 5758ebf5-c898-48e6-9fe9-5665c0a83866 ' , self . event )
2016-08-18 00:23:49 +02:00
self . assertEqual ( e0 , e1 )
2016-08-18 00:40:30 +02:00
e2 = pymisp . update ( e0 )
2016-08-18 00:23:49 +02:00
self . assertEqual ( e1 , e2 )
self . assertEqual ( self . event , e2 )
def test_deleteEvent ( self , m ) :
self . initURI ( m )
2016-08-26 18:23:02 +02:00
pymisp = PyMISP ( self . domain , self . key )
2016-08-18 00:23:49 +02:00
d = pymisp . delete_event ( 2 )
self . assertEqual ( d , { ' message ' : ' Event deleted. ' } )
d = pymisp . delete_event ( 3 )
self . assertEqual ( d , { ' errors ' : [ ' Invalid event ' ] , ' message ' : ' Invalid event ' , ' name ' : ' Invalid event ' , ' url ' : ' /events/3 ' } )
def test_deleteAttribute ( self , m ) :
self . initURI ( m )
2016-08-26 18:23:02 +02:00
pymisp = PyMISP ( self . domain , self . key )
2016-08-18 00:23:49 +02:00
d = pymisp . delete_attribute ( 2 )
2016-08-18 00:40:30 +02:00
self . assertEqual ( d , { ' message ' : ' Attribute deleted. ' } )
2016-08-18 13:18:58 +02:00
def test_getVersions ( self , m ) :
self . initURI ( m )
2016-08-26 18:23:02 +02:00
pymisp = PyMISP ( self . domain , self . key )
2016-08-18 13:18:58 +02:00
api_version = pymisp . get_api_version ( )
self . assertEqual ( api_version , { ' version ' : pm . __version__ } )
server_version = pymisp . get_version ( )
2017-01-27 13:17:38 +01:00
self . assertEqual ( server_version , { " version " : " 2.4.62 " } )
2016-08-18 13:18:58 +02:00
def test_getSharingGroups ( self , m ) :
self . initURI ( m )
2016-08-26 18:23:02 +02:00
pymisp = PyMISP ( self . domain , self . key )
2016-08-18 13:18:58 +02:00
sharing_groups = pymisp . get_sharing_groups ( )
2017-01-11 20:50:58 +01:00
self . assertEqual ( sharing_groups [ 0 ] , self . sharing_groups [ ' response ' ] [ 0 ] )
2016-08-19 10:13:00 +02:00
def test_auth_error ( self , m ) :
self . initURI ( m )
2016-08-26 18:23:02 +02:00
pymisp = PyMISP ( self . domain , self . key )
2016-08-19 10:13:00 +02:00
error = pymisp . get ( 1 )
response = self . auth_error_msg
response [ ' errors ' ] = [ response [ ' message ' ] ]
2016-08-26 18:23:02 +02:00
self . assertEqual ( error , response )
def test_newEvent ( self , m ) :
2017-02-23 14:42:33 +01:00
error_empty_info = { ' message ' : ' The event could not be saved. ' , ' name ' : ' Add event failed. ' , ' errors ' : [ ' Error in info: Info cannot be empty. ' ] , ' url ' : ' /events/add ' }
2016-08-27 18:13:15 +02:00
error_empty_info_flatten = { u ' message ' : u ' The event could not be saved. ' , u ' name ' : u ' Add event failed. ' , u ' errors ' : [ u " Error in info: Info cannot be empty. " ] , u ' url ' : u ' /events/add ' }
2016-08-26 18:23:02 +02:00
self . initURI ( m )
pymisp = PyMISP ( self . domain , self . key )
m . register_uri ( ' POST ' , self . domain + ' events ' , json = error_empty_info )
2016-09-28 18:20:37 +02:00
# TODO Add test exception if info field isn't set
response = pymisp . new_event ( 0 , 1 , 0 , ' Foo ' )
2016-08-26 18:23:02 +02:00
self . assertEqual ( response , error_empty_info_flatten )
m . register_uri ( ' POST ' , self . domain + ' events ' , json = self . new_misp_event )
response = pymisp . new_event ( 0 , 1 , 0 , " This is a test. " , ' 2016-08-26 ' , False )
self . assertEqual ( response , self . new_misp_event )
2016-09-27 19:47:22 +02:00
def test_eventObject ( self , m ) :
self . initURI ( m )
pymisp = PyMISP ( self . domain , self . key )
2016-10-10 13:42:06 +02:00
misp_event = MISPEvent ( pymisp . describe_types )
2016-12-02 16:53:45 +01:00
with open ( ' tests/57c4445b-c548-4654-af0b-4be3950d210f.json ' , ' r ' ) as f :
misp_event . load ( f . read ( ) )
2017-09-12 16:46:06 +02:00
json . dumps ( misp_event , cls = MISPEncode )
2016-08-19 10:13:00 +02:00
2016-12-03 17:29:41 +01:00
def test_searchIndexByTagId ( self , m ) :
2016-12-02 16:52:00 +01:00
self . initURI ( m )
pymisp = PyMISP ( self . domain , self . key )
response = pymisp . search_index ( tag = " 1 " )
2016-12-03 17:29:41 +01:00
self . assertEqual ( response [ ' response ' ] , self . search_index_result )
def test_searchIndexByTagName ( self , m ) :
2016-12-02 18:52:56 +01:00
self . initURI ( m )
pymisp = PyMISP ( self . domain , self . key )
response = pymisp . search_index ( tag = ' ecsirt:malicious-code= " ransomware " ' )
2016-12-03 17:29:41 +01:00
self . assertEqual ( response [ ' response ' ] , self . search_index_result )
2016-12-01 10:49:12 +01:00
def test_addAttributes ( self , m ) :
class MockPyMISP ( PyMISP ) :
def _send_attributes ( self , event , attributes , proposal = False ) :
return len ( attributes )
self . initURI ( m )
p = MockPyMISP ( self . domain , self . key )
evt = p . get ( 1 )
2017-03-15 11:33:24 +01:00
self . assertEqual ( 3 , p . add_hashes ( evt , md5 = ' 68b329da9893e34099c7d8ad5cb9c940 ' ,
2017-07-27 17:14:49 +02:00
sha1 = ' adc83b19e793491b1c6ea0fd8b46cd9f32e592fc ' ,
sha256 = ' 01ba4719c80b6fe911b091a7c05124b64eeece964e09c058ef8f9805daca546b ' ,
filename = ' foobar.exe ' ) )
2017-03-15 11:33:24 +01:00
self . assertEqual ( 3 , p . add_hashes ( evt , md5 = ' 68b329da9893e34099c7d8ad5cb9c940 ' ,
2017-07-27 17:14:49 +02:00
sha1 = ' adc83b19e793491b1c6ea0fd8b46cd9f32e592fc ' ,
sha256 = ' 01ba4719c80b6fe911b091a7c05124b64eeece964e09c058ef8f9805daca546b ' ) )
2016-12-01 10:49:12 +01:00
p . av_detection_link ( evt , ' https://foocorp.com ' )
p . add_detection_name ( evt , ' WATERMELON ' )
p . add_filename ( evt , ' foobar.exe ' )
p . add_regkey ( evt , ' HKLM \\ Software \\ Microsoft \\ Outlook \\ Addins \\ foobar ' )
2016-12-03 17:29:41 +01:00
p . add_regkey ( evt , ' HKLM \\ Software \\ Microsoft \\ Outlook \\ Addins \\ foobar ' , rvalue = ' foobar ' )
2016-12-01 10:49:12 +01:00
regkeys = {
2016-12-03 17:29:41 +01:00
' HKLM \\ Software \\ Microsoft \\ Outlook \\ Addins \\ foo ' : None ,
' HKLM \\ Software \\ Microsoft \\ Outlook \\ Addins \\ bar ' : ' baz ' ,
' HKLM \\ Software \\ Microsoft \\ Outlook \\ Addins \\ bae ' : 0 ,
2016-12-01 10:49:12 +01:00
}
2016-12-01 14:26:59 +01:00
self . assertEqual ( 3 , p . add_regkeys ( evt , regkeys ) )
2016-12-01 10:49:12 +01:00
p . add_pattern ( evt , ' .*foobar.* ' , in_memory = True )
p . add_pattern ( evt , ' .*foobar.* ' , in_file = True )
self . assertRaises ( pm . PyMISPError , p . add_pattern , evt , ' .*foobar.* ' , in_memory = False , in_file = False )
p . add_pipe ( evt , ' foo ' )
p . add_pipe ( evt , ' \\ . \\ pipe \\ foo ' )
2017-03-15 11:33:24 +01:00
self . assertEqual ( 3 , p . add_pipe ( evt , [ ' foo ' , ' bar ' , ' baz ' ] ) )
self . assertEqual ( 3 , p . add_pipe ( evt , [ ' foo ' , ' bar ' , ' \\ . \\ pipe \\ baz ' ] ) )
2016-12-01 10:49:12 +01:00
p . add_mutex ( evt , ' foo ' )
2017-03-15 11:33:24 +01:00
self . assertEqual ( 1 , p . add_mutex ( evt , ' \\ BaseNamedObjects \\ foo ' ) )
self . assertEqual ( 3 , p . add_mutex ( evt , [ ' foo ' , ' bar ' , ' baz ' ] ) )
self . assertEqual ( 3 , p . add_mutex ( evt , [ ' foo ' , ' bar ' , ' \\ BaseNamedObjects \\ baz ' ] ) )
2016-12-01 10:49:12 +01:00
p . add_yara ( evt , ' rule Foo {} ' )
2017-03-15 11:33:24 +01:00
self . assertEqual ( 2 , p . add_yara ( evt , [ ' rule Foo {} ' , ' rule Bar {} ' ] ) )
2016-12-01 10:49:12 +01:00
p . add_ipdst ( evt , ' 1.2.3.4 ' )
2017-03-15 11:33:24 +01:00
self . assertEqual ( 2 , p . add_ipdst ( evt , [ ' 1.2.3.4 ' , ' 5.6.7.8 ' ] ) )
2016-12-01 10:49:12 +01:00
p . add_ipsrc ( evt , ' 1.2.3.4 ' )
2017-03-15 11:33:24 +01:00
self . assertEqual ( 2 , p . add_ipsrc ( evt , [ ' 1.2.3.4 ' , ' 5.6.7.8 ' ] ) )
2016-12-01 10:49:12 +01:00
p . add_hostname ( evt , ' a.foobar.com ' )
2017-03-15 11:33:24 +01:00
self . assertEqual ( 2 , p . add_hostname ( evt , [ ' a.foobar.com ' , ' a.foobaz.com ' ] ) )
2016-12-01 10:49:12 +01:00
p . add_domain ( evt , ' foobar.com ' )
2017-03-15 11:33:24 +01:00
self . assertEqual ( 2 , p . add_domain ( evt , [ ' foobar.com ' , ' foobaz.com ' ] ) )
2016-12-01 10:49:12 +01:00
p . add_domain_ip ( evt , ' foo.com ' , ' 1.2.3.4 ' )
2017-03-15 11:33:24 +01:00
self . assertEqual ( 2 , p . add_domain_ip ( evt , ' foo.com ' , [ ' 1.2.3.4 ' , ' 5.6.7.8 ' ] ) )
self . assertEqual ( 2 , p . add_domains_ips ( evt , { ' foo.com ' : ' 1.2.3.4 ' , ' bar.com ' : ' 4.5.6.7 ' } ) )
2016-12-01 10:49:12 +01:00
p . add_url ( evt , ' https://example.com ' )
2017-03-15 11:33:24 +01:00
self . assertEqual ( 2 , p . add_url ( evt , [ ' https://example.com ' , ' http://foo.com ' ] ) )
2016-12-01 10:49:12 +01:00
p . add_useragent ( evt , ' Mozilla ' )
2017-03-15 11:33:24 +01:00
self . assertEqual ( 2 , p . add_useragent ( evt , [ ' Mozilla ' , ' Godzilla ' ] ) )
2016-12-01 10:49:12 +01:00
p . add_traffic_pattern ( evt , ' blabla ' )
p . add_snort ( evt , ' blaba ' )
p . add_net_other ( evt , ' blabla ' )
p . add_email_src ( evt , ' foo@bar.com ' )
2016-12-03 17:29:41 +01:00
p . add_email_dst ( evt , ' foo@bar.com ' )
2016-12-01 10:49:12 +01:00
p . add_email_subject ( evt , ' you won the lottery ' )
p . add_email_attachment ( evt , ' foo.doc ' )
p . add_target_email ( evt , ' foo@bar.com ' )
p . add_target_user ( evt , ' foo ' )
2016-12-03 17:29:41 +01:00
p . add_target_machine ( evt , ' foobar ' )
p . add_target_org ( evt , ' foobar ' )
p . add_target_location ( evt , ' foobar ' )
2016-12-01 10:49:12 +01:00
p . add_target_external ( evt , ' foobar ' )
p . add_threat_actor ( evt , ' WATERMELON ' )
p . add_internal_link ( evt , ' foobar ' )
p . add_internal_comment ( evt , ' foobar ' )
p . add_internal_text ( evt , ' foobar ' )
p . add_internal_other ( evt , ' foobar ' )
2017-03-09 16:41:07 +01:00
p . add_attachment ( evt , " testFile " )
2017-08-25 17:14:33 +02:00
def make_objects ( self , path ) :
to_return = { ' objects ' : [ ] , ' references ' : [ ] }
fo , peo , seos = make_binary_objects ( path )
if seos :
for s in seos :
to_return [ ' objects ' ] . append ( s )
2017-08-28 19:01:53 +02:00
if s . ObjectReference :
to_return [ ' references ' ] + = s . ObjectReference
2017-08-25 17:14:33 +02:00
if peo :
to_return [ ' objects ' ] . append ( peo )
2017-08-28 19:01:53 +02:00
if peo . ObjectReference :
to_return [ ' references ' ] + = peo . ObjectReference
2017-08-25 17:14:33 +02:00
if fo :
to_return [ ' objects ' ] . append ( fo )
2017-08-28 19:01:53 +02:00
if fo . ObjectReference :
to_return [ ' references ' ] + = fo . ObjectReference
2017-08-25 17:14:33 +02:00
return json . dumps ( to_return , cls = MISPEncode )
def test_objects ( self , m ) :
paths = [ ' cmd.exe ' , ' tmux ' , ' MachO-OSX-x64-ls ' ]
2017-09-20 11:15:33 +02:00
try :
for path in paths :
json_blob = self . make_objects ( os . path . join ( ' tests ' ,
' viper-test-files ' , ' test_files ' , path ) )
except IOError : # Can be replaced with FileNotFoundError when support for python 2 is dropped
return unittest . SkipTest ( )
2017-08-25 17:14:33 +02:00
print ( json_blob )
2016-08-19 10:13:00 +02:00
if __name__ == ' __main__ ' :
unittest . main ( )