mirror of https://github.com/MISP/MISP
new: [test] Sync
parent
43dd5d9e58
commit
17e2790997
|
@ -242,6 +242,7 @@ jobs:
|
|||
poetry run python tests/testlive_comprehensive.py
|
||||
poetry add lxml
|
||||
poetry run python ../tests/testlive_security.py -v
|
||||
poetry run python ../tests/testlive_sync.py
|
||||
poetry run python tests/test_mispevent.py
|
||||
popd
|
||||
cp PyMISP/tests/keys.py PyMISP/examples/events/
|
||||
|
|
|
@ -738,10 +738,6 @@ class ServersController extends AppController
|
|||
if (!$this->_isSiteAdmin() && !($s['Server']['org_id'] == $this->Auth->user('org_id') && $this->_isAdmin())) {
|
||||
throw new MethodNotAllowedException(__('You are not authorised to do that.'));
|
||||
}
|
||||
$this->Server->id = $id;
|
||||
if (!$this->Server->exists()) {
|
||||
throw new NotFoundException(__('Invalid server'));
|
||||
}
|
||||
if (false == $this->Server->data['Server']['pull'] && ($technique == 'full' || $technique == 'incremental')) {
|
||||
$error = __('Pull setting not enabled for this server.');
|
||||
}
|
||||
|
@ -752,7 +748,7 @@ class ServersController extends AppController
|
|||
if (!Configure::read('MISP.background_jobs')) {
|
||||
$result = $this->Server->pull($this->Auth->user(), $id, $technique, $s);
|
||||
if (is_array($result)) {
|
||||
$success = sprintf(__('Pull completed. %s events pulled, %s events could not be pulled, %s proposals pulled, %s sightings pulled, %s clusters pulled.', count($result[0]), count($result[1]), $result[2], $result[3], $result[4]));
|
||||
$success = __('Pull completed. %s events pulled, %s events could not be pulled, %s proposals pulled, %s sightings pulled, %s clusters pulled.', count($result[0]), count($result[1]), $result[2], $result[3], $result[4]);
|
||||
} else {
|
||||
$error = $result;
|
||||
}
|
||||
|
@ -780,14 +776,14 @@ class ServersController extends AppController
|
|||
array('pull', $this->Auth->user('id'), $id, $technique, $jobId)
|
||||
);
|
||||
$this->Job->saveField('process_id', $process_id);
|
||||
$success = sprintf(__('Pull queued for background execution. Job ID: %s'), $jobId);
|
||||
$success = __('Pull queued for background execution. Job ID: %s', $jobId);
|
||||
}
|
||||
}
|
||||
if ($this->_isRest()) {
|
||||
if (!empty($error)) {
|
||||
return $this->RestResponse->saveFailResponse('Servers', 'pull', false, $error, $this->response->type());
|
||||
return $this->RestResponse->saveFailResponse('Servers', 'pull', $id, $error, $this->response->type());
|
||||
} else {
|
||||
return $this->RestResponse->saveSuccessResponse('Servers', 'pull', $success, $this->response->type());
|
||||
return $this->RestResponse->saveSuccessResponse('Servers', 'pull', $id, $this->response->type(), $success);
|
||||
}
|
||||
} else {
|
||||
if (!empty($error)) {
|
||||
|
|
|
@ -0,0 +1,49 @@
|
|||
import os
|
||||
from pymisp import PyMISP
|
||||
|
||||
|
||||
def check_response(response):
|
||||
if isinstance(response, dict) and "errors" in response:
|
||||
raise Exception(response["errors"])
|
||||
|
||||
|
||||
# Load access information for env variables
|
||||
url = "http://" + os.environ["HOST"]
|
||||
key = os.environ["AUTH"]
|
||||
|
||||
pymisp = PyMISP(url, key)
|
||||
pymisp.global_pythonify = True
|
||||
|
||||
# Create new remote server, that is the same just for test
|
||||
remote_server = pymisp.add_server({
|
||||
"pull": True,
|
||||
"pull_galaxy_clusters": True,
|
||||
"remote_org_id": 1,
|
||||
"name": "Localhost",
|
||||
"url": url,
|
||||
"authkey": key,
|
||||
})
|
||||
check_response(remote_server)
|
||||
|
||||
# Check connection
|
||||
server_test = pymisp.test_server(remote_server)
|
||||
check_response(server_test)
|
||||
assert server_test["status"] == 1
|
||||
assert server_test["post"] == 1
|
||||
|
||||
# Get remote user
|
||||
url = f'servers/getRemoteUser/{remote_server["id"]}'
|
||||
remote_user = pymisp._check_json_response(pymisp._prepare_request('GET', url))
|
||||
check_response(remote_user)
|
||||
assert remote_user["Sync flag"] == "Yes"
|
||||
assert remote_user["Role name"] == "admin"
|
||||
assert remote_user["User"] == "admin@admin.test"
|
||||
|
||||
# Test pull
|
||||
url = f'servers/pull/{remote_server["id"]}/disable_background_processing:1'
|
||||
pull_response = pymisp._check_json_response(pymisp._prepare_request('GET', url))
|
||||
check_response(pull_response)
|
||||
assert "Pull completed. 0 events pulled, 0 events could not be pulled, 0 proposals pulled, 0 sightings pulled, 0 clusters pulled." == pull_response["message"], pull_response["message"]
|
||||
|
||||
# Delete server
|
||||
check_response(pymisp.delete_server(remote_server))
|
Loading…
Reference in New Issue