mirror of https://github.com/MISP/PyMISP
fix: Make flake8 happy
parent
14d278fff2
commit
35257e538d
|
@ -216,15 +216,15 @@ class PyMISP:
|
||||||
return self._check_json_response(response)
|
return self._check_json_response(response)
|
||||||
|
|
||||||
def server_settings(self) -> Dict:
|
def server_settings(self) -> Dict:
|
||||||
response = self._prepare_request('GET', f'/servers/serverSettings')
|
response = self._prepare_request('GET', '/servers/serverSettings')
|
||||||
return self._check_json_response(response)
|
return self._check_json_response(response)
|
||||||
|
|
||||||
def restart_workers(self) -> Dict:
|
def restart_workers(self) -> Dict:
|
||||||
response = self._prepare_request('POST', f'/servers/restartWorkers')
|
response = self._prepare_request('POST', '/servers/restartWorkers')
|
||||||
return self._check_json_response(response)
|
return self._check_json_response(response)
|
||||||
|
|
||||||
def db_schema_diagnostic(self) -> Dict:
|
def db_schema_diagnostic(self) -> Dict:
|
||||||
response = self._prepare_request('GET', f'/servers/dbSchemaDiagnostic')
|
response = self._prepare_request('GET', '/servers/dbSchemaDiagnostic')
|
||||||
return self._check_json_response(response)
|
return self._check_json_response(response)
|
||||||
|
|
||||||
def toggle_global_pythonify(self) -> None:
|
def toggle_global_pythonify(self) -> None:
|
||||||
|
@ -412,7 +412,7 @@ class PyMISP:
|
||||||
# ## BEGIN Attribute ###
|
# ## BEGIN Attribute ###
|
||||||
|
|
||||||
def attributes(self, pythonify: bool=False) -> Union[Dict, List[MISPAttribute]]:
|
def attributes(self, pythonify: bool=False) -> Union[Dict, List[MISPAttribute]]:
|
||||||
r = self._prepare_request('GET', f'attributes/index')
|
r = self._prepare_request('GET', 'attributes/index')
|
||||||
attributes_r = self._check_json_response(r)
|
attributes_r = self._check_json_response(r)
|
||||||
if not (self.global_pythonify or pythonify) or 'errors' in attributes_r:
|
if not (self.global_pythonify or pythonify) or 'errors' in attributes_r:
|
||||||
return attributes_r
|
return attributes_r
|
||||||
|
@ -511,7 +511,7 @@ class PyMISP:
|
||||||
event_id = get_uuid_or_id_from_abstract_misp(event)
|
event_id = get_uuid_or_id_from_abstract_misp(event)
|
||||||
r = self._prepare_request('GET', f'shadow_attributes/index/{event_id}')
|
r = self._prepare_request('GET', f'shadow_attributes/index/{event_id}')
|
||||||
else:
|
else:
|
||||||
r = self._prepare_request('GET', f'shadow_attributes')
|
r = self._prepare_request('GET', 'shadow_attributes')
|
||||||
attribute_proposals = self._check_json_response(r)
|
attribute_proposals = self._check_json_response(r)
|
||||||
if not (self.global_pythonify or pythonify) or 'errors' in attribute_proposals:
|
if not (self.global_pythonify or pythonify) or 'errors' in attribute_proposals:
|
||||||
return attribute_proposals
|
return attribute_proposals
|
||||||
|
@ -618,7 +618,7 @@ class PyMISP:
|
||||||
r = self._prepare_request('POST', f'sightings/add/{attribute_id}', data=sighting)
|
r = self._prepare_request('POST', f'sightings/add/{attribute_id}', data=sighting)
|
||||||
else:
|
else:
|
||||||
# Either the ID/UUID is in the sighting, or we want to add a sighting on all the attributes with the given value
|
# Either the ID/UUID is in the sighting, or we want to add a sighting on all the attributes with the given value
|
||||||
r = self._prepare_request('POST', f'sightings/add', data=sighting)
|
r = self._prepare_request('POST', 'sightings/add', data=sighting)
|
||||||
new_sighting = self._check_json_response(r)
|
new_sighting = self._check_json_response(r)
|
||||||
if not (self.global_pythonify or pythonify) or 'errors' in new_sighting:
|
if not (self.global_pythonify or pythonify) or 'errors' in new_sighting:
|
||||||
return new_sighting
|
return new_sighting
|
||||||
|
@ -1090,7 +1090,7 @@ class PyMISP:
|
||||||
|
|
||||||
def import_server(self, server: MISPServer, pythonify: bool=False) -> Union[Dict, MISPServer]:
|
def import_server(self, server: MISPServer, pythonify: bool=False) -> Union[Dict, MISPServer]:
|
||||||
"""Import a sync server config received from get_sync_config"""
|
"""Import a sync server config received from get_sync_config"""
|
||||||
r = self._prepare_request('POST', f'servers/import', data=server)
|
r = self._prepare_request('POST', 'servers/import', data=server)
|
||||||
server_j = self._check_json_response(r)
|
server_j = self._check_json_response(r)
|
||||||
if not (self.global_pythonify or pythonify) or 'errors' in server_j:
|
if not (self.global_pythonify or pythonify) or 'errors' in server_j:
|
||||||
return server_j
|
return server_j
|
||||||
|
@ -1101,7 +1101,7 @@ class PyMISP:
|
||||||
def add_server(self, server: MISPServer, pythonify: bool=False) -> Union[Dict, MISPServer]:
|
def add_server(self, server: MISPServer, pythonify: bool=False) -> Union[Dict, MISPServer]:
|
||||||
"""Add a server to synchronise with.
|
"""Add a server to synchronise with.
|
||||||
Note: You probably want to use ExpandedPyMISP.get_sync_config and ExpandedPyMISP.import_server instead"""
|
Note: You probably want to use ExpandedPyMISP.get_sync_config and ExpandedPyMISP.import_server instead"""
|
||||||
r = self._prepare_request('POST', f'servers/add', data=server)
|
r = self._prepare_request('POST', 'servers/add', data=server)
|
||||||
server_j = self._check_json_response(r)
|
server_j = self._check_json_response(r)
|
||||||
if not (self.global_pythonify or pythonify) or 'errors' in server_j:
|
if not (self.global_pythonify or pythonify) or 'errors' in server_j:
|
||||||
return server_j
|
return server_j
|
||||||
|
@ -1177,7 +1177,7 @@ class PyMISP:
|
||||||
|
|
||||||
def add_sharing_group(self, sharing_group: MISPSharingGroup, pythonify: bool=False) -> Union[Dict, MISPSharingGroup]:
|
def add_sharing_group(self, sharing_group: MISPSharingGroup, pythonify: bool=False) -> Union[Dict, MISPSharingGroup]:
|
||||||
"""Add a new sharing group"""
|
"""Add a new sharing group"""
|
||||||
r = self._prepare_request('POST', f'sharing_groups/add', data=sharing_group)
|
r = self._prepare_request('POST', 'sharing_groups/add', data=sharing_group)
|
||||||
sharing_group_j = self._check_json_response(r)
|
sharing_group_j = self._check_json_response(r)
|
||||||
if not (self.global_pythonify or pythonify) or 'errors' in sharing_group_j:
|
if not (self.global_pythonify or pythonify) or 'errors' in sharing_group_j:
|
||||||
return sharing_group_j
|
return sharing_group_j
|
||||||
|
@ -1271,7 +1271,7 @@ class PyMISP:
|
||||||
|
|
||||||
def add_organisation(self, organisation: MISPOrganisation, pythonify: bool=False) -> Union[Dict, MISPOrganisation]:
|
def add_organisation(self, organisation: MISPOrganisation, pythonify: bool=False) -> Union[Dict, MISPOrganisation]:
|
||||||
'''Add an organisation'''
|
'''Add an organisation'''
|
||||||
r = self._prepare_request('POST', f'admin/organisations/add', data=organisation)
|
r = self._prepare_request('POST', 'admin/organisations/add', data=organisation)
|
||||||
new_organisation = self._check_json_response(r)
|
new_organisation = self._check_json_response(r)
|
||||||
if not (self.global_pythonify or pythonify) or 'errors' in new_organisation:
|
if not (self.global_pythonify or pythonify) or 'errors' in new_organisation:
|
||||||
return new_organisation
|
return new_organisation
|
||||||
|
@ -1342,7 +1342,7 @@ class PyMISP:
|
||||||
|
|
||||||
def add_user(self, user: MISPUser, pythonify: bool=False) -> Union[Dict, MISPUser]:
|
def add_user(self, user: MISPUser, pythonify: bool=False) -> Union[Dict, MISPUser]:
|
||||||
'''Add a new user'''
|
'''Add a new user'''
|
||||||
r = self._prepare_request('POST', f'admin/users/add', data=user)
|
r = self._prepare_request('POST', 'admin/users/add', data=user)
|
||||||
user_j = self._check_json_response(r)
|
user_j = self._check_json_response(r)
|
||||||
if not (self.global_pythonify or pythonify) or 'errors' in user_j:
|
if not (self.global_pythonify or pythonify) or 'errors' in user_j:
|
||||||
return user_j
|
return user_j
|
||||||
|
@ -1375,7 +1375,7 @@ class PyMISP:
|
||||||
return self._check_json_response(response)
|
return self._check_json_response(response)
|
||||||
|
|
||||||
def change_user_password(self, new_password: str, user: Optional[Union[MISPUser, int, str, UUID]]=None) -> Dict:
|
def change_user_password(self, new_password: str, user: Optional[Union[MISPUser, int, str, UUID]]=None) -> Dict:
|
||||||
response = self._prepare_request('POST', f'users/change_pw', data={'password': new_password})
|
response = self._prepare_request('POST', 'users/change_pw', data={'password': new_password})
|
||||||
return self._check_json_response(response)
|
return self._check_json_response(response)
|
||||||
|
|
||||||
def user_registrations(self, pythonify: bool=False) -> Union[Dict, List[MISPInbox]]:
|
def user_registrations(self, pythonify: bool=False) -> Union[Dict, List[MISPInbox]]:
|
||||||
|
@ -1887,9 +1887,9 @@ class PyMISP:
|
||||||
return normalized_response
|
return normalized_response
|
||||||
|
|
||||||
to_return = []
|
to_return = []
|
||||||
for l in normalized_response:
|
for log in normalized_response:
|
||||||
ml = MISPLog()
|
ml = MISPLog()
|
||||||
ml.from_dict(**l)
|
ml.from_dict(**log)
|
||||||
to_return.append(ml)
|
to_return.append(ml)
|
||||||
return to_return
|
return to_return
|
||||||
|
|
||||||
|
@ -2128,7 +2128,7 @@ class PyMISP:
|
||||||
query: Dict[str, Any] = {'setting': user_setting}
|
query: Dict[str, Any] = {'setting': user_setting}
|
||||||
if user:
|
if user:
|
||||||
query['user_id'] = get_uuid_or_id_from_abstract_misp(user)
|
query['user_id'] = get_uuid_or_id_from_abstract_misp(user)
|
||||||
response = self._prepare_request('POST', f'user_settings/getSetting')
|
response = self._prepare_request('POST', 'user_settings/getSetting')
|
||||||
user_setting_j = self._check_json_response(response)
|
user_setting_j = self._check_json_response(response)
|
||||||
if not (self.global_pythonify or pythonify) or 'errors' in user_setting_j:
|
if not (self.global_pythonify or pythonify) or 'errors' in user_setting_j:
|
||||||
return user_setting_j
|
return user_setting_j
|
||||||
|
@ -2145,7 +2145,7 @@ class PyMISP:
|
||||||
query['value'] = value
|
query['value'] = value
|
||||||
if user:
|
if user:
|
||||||
query['user_id'] = get_uuid_or_id_from_abstract_misp(user)
|
query['user_id'] = get_uuid_or_id_from_abstract_misp(user)
|
||||||
response = self._prepare_request('POST', f'user_settings/setSetting', data=query)
|
response = self._prepare_request('POST', 'user_settings/setSetting', data=query)
|
||||||
user_setting_j = self._check_json_response(response)
|
user_setting_j = self._check_json_response(response)
|
||||||
if not (self.global_pythonify or pythonify) or 'errors' in user_setting_j:
|
if not (self.global_pythonify or pythonify) or 'errors' in user_setting_j:
|
||||||
return user_setting_j
|
return user_setting_j
|
||||||
|
@ -2158,7 +2158,7 @@ class PyMISP:
|
||||||
query: Dict[str, Any] = {'setting': user_setting}
|
query: Dict[str, Any] = {'setting': user_setting}
|
||||||
if user:
|
if user:
|
||||||
query['user_id'] = get_uuid_or_id_from_abstract_misp(user)
|
query['user_id'] = get_uuid_or_id_from_abstract_misp(user)
|
||||||
response = self._prepare_request('POST', f'user_settings/delete', data=query)
|
response = self._prepare_request('POST', 'user_settings/delete', data=query)
|
||||||
return self._check_json_response(response)
|
return self._check_json_response(response)
|
||||||
|
|
||||||
# ## END User Settings ###
|
# ## END User Settings ###
|
||||||
|
|
|
@ -34,7 +34,7 @@ class CSVLoader():
|
||||||
self.fieldnames = fieldnames
|
self.fieldnames = fieldnames
|
||||||
|
|
||||||
if not self.fieldnames:
|
if not self.fieldnames:
|
||||||
raise Exception(f'No fieldnames, impossible to create objects.')
|
raise Exception('No fieldnames, impossible to create objects.')
|
||||||
else:
|
else:
|
||||||
# Check if the CSV file has a header, and if it matches with the object template
|
# Check if the CSV file has a header, and if it matches with the object template
|
||||||
tmp_object = MISPObject(self.template_name)
|
tmp_object = MISPObject(self.template_name)
|
||||||
|
|
|
@ -28,7 +28,7 @@ class SSHAuthorizedKeysObject(AbstractMISPObjectGenerator):
|
||||||
self.generate_attributes()
|
self.generate_attributes()
|
||||||
|
|
||||||
def generate_attributes(self):
|
def generate_attributes(self):
|
||||||
for l in self.__pseudofile:
|
for line in self.__pseudofile:
|
||||||
if l.startswith('ssh') or l.startswith('ecdsa'):
|
if line.startswith('ssh') or line.startswith('ecdsa'):
|
||||||
key = l.split(' ')[1]
|
key = line.split(' ')[1]
|
||||||
self.add_attribute('key', key)
|
self.add_attribute('key', key)
|
||||||
|
|
Loading…
Reference in New Issue