From 0eb209c7df012e07bdd91e3f71cc20b206ed4293 Mon Sep 17 00:00:00 2001 From: VVX7 Date: Fri, 8 May 2020 16:10:09 -0400 Subject: [PATCH 1/6] new: [dev] add microblog object tool --- pymisp/tools/microblogobject.py | 140 ++++++++++++++++++++++++++++++++ 1 file changed, 140 insertions(+) create mode 100644 pymisp/tools/microblogobject.py diff --git a/pymisp/tools/microblogobject.py b/pymisp/tools/microblogobject.py new file mode 100644 index 0000000..f0da44d --- /dev/null +++ b/pymisp/tools/microblogobject.py @@ -0,0 +1,140 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from pymisp.tools.abstractgenerator import AbstractMISPObjectGenerator + + +class MicroblogObject(AbstractMISPObjectGenerator): + + def __init__(self, parameters: dict, strict: bool = True, standalone: bool = True, **kwargs): + super(MicroblogObject, self).__init__('microblog', strict=strict, standalone=standalone, **kwargs) + self._parameters = parameters + self.generate_attributes() + + def generate_attributes(self): + # Raw post. + if self._parameters.get('post'): + self.add_attribute('post', value=self._parameters['post']) + + # Title of the post. + if self._parameters.get('title'): + self.add_attribute('title', value=self._parameters['title']) + + # Original link into the microblog post (Supposed harmless). + if self._parameters.get('link'): + self.add_attribute('link', value=self._parameters['link']) + + # Original URL location of the microblog post (potentially malicious. + if self._parameters.get('url'): + if type(self._parameters.get('url')) is list: + for i in self._parameters.get('url'): + self.add_attribute('url', value=i) + else: + self.add_attribute('url', value=self._parameters['url']) + + # Archive of the original document (Internet Archive, Archive.is, etc). + if self._parameters.get('archive'): + if type(self._parameters.get('archive')) is list: + for i in self._parameters.get('archive'): + self.add_attribute('archive', value=i) + else: + self.add_attribute('archive', value=self._parameters['archive']) + + # Display name of the account who posted the microblog. + if self._parameters.get('display-name'): + self.add_attribute('display-name', value=self._parameters['display-name']) + + # The user ID of the microblog this post replies to. + if self._parameters.get('in-reply-to-user-id'): + self.add_attribute('in-reply-to-user-id', value=self._parameters['in-reply-to-user-id']) + + # The microblog ID of the microblog this post replies to. + if self._parameters.get('in-reply-to-status-id'): + self.add_attribute('in-reply-to-status-id', value=self._parameters['in-reply-to-status-id']) + + # The user display name of the microblog this post replies to. + if self._parameters.get('in-reply-to-display-name'): + self.add_attribute('in-reply-to-display-name', value=self._parameters['in-reply-to-display-name']) + + # The language of the post. + if self._parameters.get('language'): + self.add_attribute('language', value=self._parameters['language'], disable_correlation=True) + + # TODO: handle attachments + # The microblog post file or screen capture. + # if self._parameters.get('attachment'): + # self.add_attribute('attachment', value=self._parameters['attachment']) + + # Type of the microblog post. + type_allowed_values = ["Twitter", "Facebook", "LinkedIn", "Reddit", "Google+", + "Instagram", "Forum", "Other"] + if self._parameters.get('type'): + if type(self._parameters.get('type')) is list: + for i in self._parameters.get('type'): + if i in type_allowed_values: + self.add_attribute('type', value=i) + else: + if self._parameters['type'] in type_allowed_values: + self.add_attribute('type', value=self._parameters['type']) + + # State of the microblog post. + type_allowed_values = ["Informative", "Malicious", "Misinformation", "Disinformation", "Unknown"] + if self._parameters.get('state'): + if type(self._parameters.get('state')) is list: + for i in self._parameters.get('state'): + if i in type_allowed_values: + self.add_attribute('state', value=i) + else: + if self._parameters['state'] in type_allowed_values: + self.add_attribute('state', value=self._parameters['state']) + + # Username who posted the microblog post (without the @ prefix). + if self._parameters.get('username'): + self.add_attribute('username', value=self._parameters['username']) + + # Is the username account verified by the operator of the microblog platform. + type_allowed_values = ["Verified", "Unverified", "Unknown"] + if self._parameters.get('verified-username'): + if type(self._parameters.get('verified-username')) is list: + for i in self._parameters.get('verified-username'): + if i in type_allowed_values: + self.add_attribute('verified-username', value=i) + else: + if self._parameters['verified-username'] in type_allowed_values: + self.add_attribute('verified-username', value=self._parameters['verified-username']) + + # embedded-link. + if self._parameters.get('embedded-link'): + if type(self._parameters.get('embedded-link')) is list: + for i in self._parameters.get('embedded-link'): + self.add_attribute('embedded-link', value=i) + else: + self.add_attribute('embedded-link', value=self._parameters['embedded-link']) + + # embedded-safe-link + if self._parameters.get('embedded-safe-link'): + if type(self._parameters.get('embedded-safe-link')) is list: + for i in self._parameters.get('embedded-safe-link'): + self.add_attribute('embedded-safe-link', value=i) + else: + self.add_attribute('embedded-safe-link', value=self._parameters['embedded-safe-link']) + + # Hashtag into the microblog post. + if self._parameters.get('hashtag'): + if type(self._parameters.get('hashtag')) is list: + for i in self._parameters.get('hashtag'): + self.add_attribute('hashtag', value=i) + else: + self.add_attribute('hashtag', value=self._parameters['hashtag']) + + # username quoted + if self._parameters.get('username-quoted'): + if type(self._parameters.get('username-quoted')) is list: + for i in self._parameters.get('username-quoted'): + self.add_attribute('username-quoted', value=i) + else: + self.add_attribute('username-quoted', value=self._parameters['username-quoted']) + + # twitter post id + if self._parameters.get('twitter-id'): + self.add_attribute('twitter-id', value=self._parameters['twitter-id']) \ No newline at end of file From de994fd944946cb7b78985ea5f52be8d8d50727d Mon Sep 17 00:00:00 2001 From: VVX7 Date: Fri, 8 May 2020 16:32:29 -0400 Subject: [PATCH 2/6] chg: [dev] change type() == list --- pymisp/tools/microblogobject.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/pymisp/tools/microblogobject.py b/pymisp/tools/microblogobject.py index f0da44d..76f62c2 100644 --- a/pymisp/tools/microblogobject.py +++ b/pymisp/tools/microblogobject.py @@ -34,7 +34,7 @@ class MicroblogObject(AbstractMISPObjectGenerator): # Archive of the original document (Internet Archive, Archive.is, etc). if self._parameters.get('archive'): - if type(self._parameters.get('archive')) is list: + if type(self._parameters.get('archive')) == list: for i in self._parameters.get('archive'): self.add_attribute('archive', value=i) else: @@ -69,7 +69,7 @@ class MicroblogObject(AbstractMISPObjectGenerator): type_allowed_values = ["Twitter", "Facebook", "LinkedIn", "Reddit", "Google+", "Instagram", "Forum", "Other"] if self._parameters.get('type'): - if type(self._parameters.get('type')) is list: + if type(self._parameters.get('type')) == list: for i in self._parameters.get('type'): if i in type_allowed_values: self.add_attribute('type', value=i) @@ -80,7 +80,7 @@ class MicroblogObject(AbstractMISPObjectGenerator): # State of the microblog post. type_allowed_values = ["Informative", "Malicious", "Misinformation", "Disinformation", "Unknown"] if self._parameters.get('state'): - if type(self._parameters.get('state')) is list: + if type(self._parameters.get('state')) == list: for i in self._parameters.get('state'): if i in type_allowed_values: self.add_attribute('state', value=i) @@ -92,10 +92,10 @@ class MicroblogObject(AbstractMISPObjectGenerator): if self._parameters.get('username'): self.add_attribute('username', value=self._parameters['username']) - # Is the username account verified by the operator of the microblog platform. + # == the username account verified by the operator of the microblog platform. type_allowed_values = ["Verified", "Unverified", "Unknown"] if self._parameters.get('verified-username'): - if type(self._parameters.get('verified-username')) is list: + if type(self._parameters.get('verified-username')) == list: for i in self._parameters.get('verified-username'): if i in type_allowed_values: self.add_attribute('verified-username', value=i) @@ -105,7 +105,7 @@ class MicroblogObject(AbstractMISPObjectGenerator): # embedded-link. if self._parameters.get('embedded-link'): - if type(self._parameters.get('embedded-link')) is list: + if type(self._parameters.get('embedded-link')) == list: for i in self._parameters.get('embedded-link'): self.add_attribute('embedded-link', value=i) else: @@ -113,7 +113,7 @@ class MicroblogObject(AbstractMISPObjectGenerator): # embedded-safe-link if self._parameters.get('embedded-safe-link'): - if type(self._parameters.get('embedded-safe-link')) is list: + if type(self._parameters.get('embedded-safe-link')) == list: for i in self._parameters.get('embedded-safe-link'): self.add_attribute('embedded-safe-link', value=i) else: @@ -121,7 +121,7 @@ class MicroblogObject(AbstractMISPObjectGenerator): # Hashtag into the microblog post. if self._parameters.get('hashtag'): - if type(self._parameters.get('hashtag')) is list: + if type(self._parameters.get('hashtag')) == list: for i in self._parameters.get('hashtag'): self.add_attribute('hashtag', value=i) else: @@ -129,7 +129,7 @@ class MicroblogObject(AbstractMISPObjectGenerator): # username quoted if self._parameters.get('username-quoted'): - if type(self._parameters.get('username-quoted')) is list: + if type(self._parameters.get('username-quoted')) == list: for i in self._parameters.get('username-quoted'): self.add_attribute('username-quoted', value=i) else: From 395d6aabac44e95b5d2f9879a9294292e52c37ec Mon Sep 17 00:00:00 2001 From: VVX7 Date: Fri, 8 May 2020 19:27:42 -0400 Subject: [PATCH 3/6] chg: [dev] fix abstract generator import. add logger. --- pymisp/tools/microblogobject.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pymisp/tools/microblogobject.py b/pymisp/tools/microblogobject.py index 76f62c2..8ea5a3c 100644 --- a/pymisp/tools/microblogobject.py +++ b/pymisp/tools/microblogobject.py @@ -1,8 +1,10 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -from pymisp.tools.abstractgenerator import AbstractMISPObjectGenerator +from .abstractgenerator import AbstractMISPObjectGenerator +import logging +logger = logging.getLogger('pymisp') class MicroblogObject(AbstractMISPObjectGenerator): From 759e9196deb5596b490e821958b521b6db2e0423 Mon Sep 17 00:00:00 2001 From: VVX7 Date: Fri, 8 May 2020 19:31:19 -0400 Subject: [PATCH 4/6] chg: [dev] use isinstance() type check. --- pymisp/tools/microblogobject.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/pymisp/tools/microblogobject.py b/pymisp/tools/microblogobject.py index 8ea5a3c..865ac84 100644 --- a/pymisp/tools/microblogobject.py +++ b/pymisp/tools/microblogobject.py @@ -28,7 +28,7 @@ class MicroblogObject(AbstractMISPObjectGenerator): # Original URL location of the microblog post (potentially malicious. if self._parameters.get('url'): - if type(self._parameters.get('url')) is list: + if isinstance(self._parameters.get('url'), list): for i in self._parameters.get('url'): self.add_attribute('url', value=i) else: @@ -36,7 +36,7 @@ class MicroblogObject(AbstractMISPObjectGenerator): # Archive of the original document (Internet Archive, Archive.is, etc). if self._parameters.get('archive'): - if type(self._parameters.get('archive')) == list: + if isinstance(self._parameters.get('archive'), list): for i in self._parameters.get('archive'): self.add_attribute('archive', value=i) else: @@ -71,7 +71,7 @@ class MicroblogObject(AbstractMISPObjectGenerator): type_allowed_values = ["Twitter", "Facebook", "LinkedIn", "Reddit", "Google+", "Instagram", "Forum", "Other"] if self._parameters.get('type'): - if type(self._parameters.get('type')) == list: + if isinstance(self._parameters.get('type'), list): for i in self._parameters.get('type'): if i in type_allowed_values: self.add_attribute('type', value=i) @@ -82,7 +82,7 @@ class MicroblogObject(AbstractMISPObjectGenerator): # State of the microblog post. type_allowed_values = ["Informative", "Malicious", "Misinformation", "Disinformation", "Unknown"] if self._parameters.get('state'): - if type(self._parameters.get('state')) == list: + if isinstance(self._parameters.get('state'), list): for i in self._parameters.get('state'): if i in type_allowed_values: self.add_attribute('state', value=i) @@ -97,7 +97,7 @@ class MicroblogObject(AbstractMISPObjectGenerator): # == the username account verified by the operator of the microblog platform. type_allowed_values = ["Verified", "Unverified", "Unknown"] if self._parameters.get('verified-username'): - if type(self._parameters.get('verified-username')) == list: + if isinstance(self._parameters.get('verified-username'), list): for i in self._parameters.get('verified-username'): if i in type_allowed_values: self.add_attribute('verified-username', value=i) @@ -107,7 +107,7 @@ class MicroblogObject(AbstractMISPObjectGenerator): # embedded-link. if self._parameters.get('embedded-link'): - if type(self._parameters.get('embedded-link')) == list: + if isinstance(self._parameters.get('embedded-link'), list): for i in self._parameters.get('embedded-link'): self.add_attribute('embedded-link', value=i) else: @@ -115,7 +115,7 @@ class MicroblogObject(AbstractMISPObjectGenerator): # embedded-safe-link if self._parameters.get('embedded-safe-link'): - if type(self._parameters.get('embedded-safe-link')) == list: + if isinstance(self._parameters.get('embedded-safe-link'), list): for i in self._parameters.get('embedded-safe-link'): self.add_attribute('embedded-safe-link', value=i) else: @@ -123,7 +123,7 @@ class MicroblogObject(AbstractMISPObjectGenerator): # Hashtag into the microblog post. if self._parameters.get('hashtag'): - if type(self._parameters.get('hashtag')) == list: + if isinstance(self._parameters.get('hashtag'), list): for i in self._parameters.get('hashtag'): self.add_attribute('hashtag', value=i) else: @@ -131,7 +131,7 @@ class MicroblogObject(AbstractMISPObjectGenerator): # username quoted if self._parameters.get('username-quoted'): - if type(self._parameters.get('username-quoted')) == list: + if isinstance(self._parameters.get('username-quoted'), list): for i in self._parameters.get('username-quoted'): self.add_attribute('username-quoted', value=i) else: From fff0caa330fa8d7e51d2c5af361899b8f71bbae7 Mon Sep 17 00:00:00 2001 From: VVX7 Date: Fri, 8 May 2020 19:54:12 -0400 Subject: [PATCH 5/6] chg: [dev] clean up how keys are accessed in self._parameters --- pymisp/tools/microblogobject.py | 41 ++++++++++++++++----------------- 1 file changed, 20 insertions(+), 21 deletions(-) diff --git a/pymisp/tools/microblogobject.py b/pymisp/tools/microblogobject.py index 865ac84..1ae6054 100644 --- a/pymisp/tools/microblogobject.py +++ b/pymisp/tools/microblogobject.py @@ -15,19 +15,19 @@ class MicroblogObject(AbstractMISPObjectGenerator): def generate_attributes(self): # Raw post. - if self._parameters.get('post'): + if 'post' in self._parameters: self.add_attribute('post', value=self._parameters['post']) # Title of the post. - if self._parameters.get('title'): + if 'title' in self._parameters: self.add_attribute('title', value=self._parameters['title']) # Original link into the microblog post (Supposed harmless). - if self._parameters.get('link'): + if 'link' in self._parameters: self.add_attribute('link', value=self._parameters['link']) # Original URL location of the microblog post (potentially malicious. - if self._parameters.get('url'): + if 'url' in self._parameters: if isinstance(self._parameters.get('url'), list): for i in self._parameters.get('url'): self.add_attribute('url', value=i) @@ -35,7 +35,7 @@ class MicroblogObject(AbstractMISPObjectGenerator): self.add_attribute('url', value=self._parameters['url']) # Archive of the original document (Internet Archive, Archive.is, etc). - if self._parameters.get('archive'): + if 'archive' in self._parameters: if isinstance(self._parameters.get('archive'), list): for i in self._parameters.get('archive'): self.add_attribute('archive', value=i) @@ -43,34 +43,33 @@ class MicroblogObject(AbstractMISPObjectGenerator): self.add_attribute('archive', value=self._parameters['archive']) # Display name of the account who posted the microblog. - if self._parameters.get('display-name'): + if 'display-name' in self._parameters: self.add_attribute('display-name', value=self._parameters['display-name']) # The user ID of the microblog this post replies to. - if self._parameters.get('in-reply-to-user-id'): + if 'in-reply-to-user-id' in self._parameters: self.add_attribute('in-reply-to-user-id', value=self._parameters['in-reply-to-user-id']) # The microblog ID of the microblog this post replies to. - if self._parameters.get('in-reply-to-status-id'): + if 'in-reply-to-status-id' in self._parameters: self.add_attribute('in-reply-to-status-id', value=self._parameters['in-reply-to-status-id']) # The user display name of the microblog this post replies to. - if self._parameters.get('in-reply-to-display-name'): + if 'in-reply-to-display-name' in self._parameters: self.add_attribute('in-reply-to-display-name', value=self._parameters['in-reply-to-display-name']) # The language of the post. - if self._parameters.get('language'): + if 'language' in self._parameters: self.add_attribute('language', value=self._parameters['language'], disable_correlation=True) - # TODO: handle attachments # The microblog post file or screen capture. - # if self._parameters.get('attachment'): + # if 'attachment' in self._parameters: # self.add_attribute('attachment', value=self._parameters['attachment']) # Type of the microblog post. type_allowed_values = ["Twitter", "Facebook", "LinkedIn", "Reddit", "Google+", "Instagram", "Forum", "Other"] - if self._parameters.get('type'): + if 'type' in self._parameters: if isinstance(self._parameters.get('type'), list): for i in self._parameters.get('type'): if i in type_allowed_values: @@ -81,7 +80,7 @@ class MicroblogObject(AbstractMISPObjectGenerator): # State of the microblog post. type_allowed_values = ["Informative", "Malicious", "Misinformation", "Disinformation", "Unknown"] - if self._parameters.get('state'): + if 'state' in self._parameters: if isinstance(self._parameters.get('state'), list): for i in self._parameters.get('state'): if i in type_allowed_values: @@ -91,12 +90,12 @@ class MicroblogObject(AbstractMISPObjectGenerator): self.add_attribute('state', value=self._parameters['state']) # Username who posted the microblog post (without the @ prefix). - if self._parameters.get('username'): + if 'username' in self._parameters: self.add_attribute('username', value=self._parameters['username']) # == the username account verified by the operator of the microblog platform. type_allowed_values = ["Verified", "Unverified", "Unknown"] - if self._parameters.get('verified-username'): + if 'verified-username' in self._parameters: if isinstance(self._parameters.get('verified-username'), list): for i in self._parameters.get('verified-username'): if i in type_allowed_values: @@ -106,7 +105,7 @@ class MicroblogObject(AbstractMISPObjectGenerator): self.add_attribute('verified-username', value=self._parameters['verified-username']) # embedded-link. - if self._parameters.get('embedded-link'): + if 'embedded-link' in self._parameters: if isinstance(self._parameters.get('embedded-link'), list): for i in self._parameters.get('embedded-link'): self.add_attribute('embedded-link', value=i) @@ -114,7 +113,7 @@ class MicroblogObject(AbstractMISPObjectGenerator): self.add_attribute('embedded-link', value=self._parameters['embedded-link']) # embedded-safe-link - if self._parameters.get('embedded-safe-link'): + if 'embedded-safe-link' in self._parameters: if isinstance(self._parameters.get('embedded-safe-link'), list): for i in self._parameters.get('embedded-safe-link'): self.add_attribute('embedded-safe-link', value=i) @@ -122,7 +121,7 @@ class MicroblogObject(AbstractMISPObjectGenerator): self.add_attribute('embedded-safe-link', value=self._parameters['embedded-safe-link']) # Hashtag into the microblog post. - if self._parameters.get('hashtag'): + if 'hashtag' in self._parameters: if isinstance(self._parameters.get('hashtag'), list): for i in self._parameters.get('hashtag'): self.add_attribute('hashtag', value=i) @@ -130,7 +129,7 @@ class MicroblogObject(AbstractMISPObjectGenerator): self.add_attribute('hashtag', value=self._parameters['hashtag']) # username quoted - if self._parameters.get('username-quoted'): + if 'username-quoted' in self._parameters: if isinstance(self._parameters.get('username-quoted'), list): for i in self._parameters.get('username-quoted'): self.add_attribute('username-quoted', value=i) @@ -138,5 +137,5 @@ class MicroblogObject(AbstractMISPObjectGenerator): self.add_attribute('username-quoted', value=self._parameters['username-quoted']) # twitter post id - if self._parameters.get('twitter-id'): + if 'twitter-id' in self._parameters: self.add_attribute('twitter-id', value=self._parameters['twitter-id']) \ No newline at end of file From dcd1db8883eef7112350381ea3fca0685fa7dc6e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Mon, 11 May 2020 15:40:20 +0200 Subject: [PATCH 6/6] fix: make flake8 happy --- pymisp/tools/microblogobject.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pymisp/tools/microblogobject.py b/pymisp/tools/microblogobject.py index 1ae6054..0b436d2 100644 --- a/pymisp/tools/microblogobject.py +++ b/pymisp/tools/microblogobject.py @@ -6,6 +6,7 @@ import logging logger = logging.getLogger('pymisp') + class MicroblogObject(AbstractMISPObjectGenerator): def __init__(self, parameters: dict, strict: bool = True, standalone: bool = True, **kwargs): @@ -138,4 +139,4 @@ class MicroblogObject(AbstractMISPObjectGenerator): # twitter post id if 'twitter-id' in self._parameters: - self.add_attribute('twitter-id', value=self._parameters['twitter-id']) \ No newline at end of file + self.add_attribute('twitter-id', value=self._parameters['twitter-id'])