diff --git a/server/initializers/constants.js b/server/initializers/constants.js index 5ccd42773..0efbbb916 100644 --- a/server/initializers/constants.js +++ b/server/initializers/constants.js @@ -1,6 +1,7 @@ 'use strict' const config = require('config') +const maxBy = require('lodash/maxBy') const path = require('path') // --------------------------------------------------------------------------- @@ -92,9 +93,13 @@ const MONGO_MIGRATION_SCRIPTS = [ { script: '0015-admin-role', version: 15 + }, + { + script: '0020-requests-endpoint', + version: 20 } ] -const LAST_MONGO_SCHEMA_VERSION = 15 +const LAST_MONGO_SCHEMA_VERSION = (maxBy(MONGO_MIGRATION_SCRIPTS, 'version'))['version'] // --------------------------------------------------------------------------- @@ -116,6 +121,10 @@ const REQUESTS_LIMIT = 10 // Number of requests to retry for replay requests module const RETRY_REQUESTS = 5 +const REQUEST_ENDPOINTS = { + VIDEOS: 'videos' +} + // --------------------------------------------------------------------------- // Password encryption @@ -162,6 +171,7 @@ module.exports = { OAUTH_LIFETIME, PAGINATION_COUNT_DEFAULT, PODS_SCORE, + REQUEST_ENDPOINTS, REQUESTS_IN_PARALLEL, REQUESTS_INTERVAL, REQUESTS_LIMIT, diff --git a/server/initializers/migrations/0020-requests-endpoint.js b/server/initializers/migrations/0020-requests-endpoint.js new file mode 100644 index 000000000..55feec571 --- /dev/null +++ b/server/initializers/migrations/0020-requests-endpoint.js @@ -0,0 +1,15 @@ +/* + Set the endpoint videos for requests. +*/ + +const mongoose = require('mongoose') + +const Request = mongoose.model('Request') + +exports.up = function (callback) { + Request.update({ }, { endpoint: 'videos' }, callback) +} + +exports.down = function (callback) { + throw new Error('Not implemented.') +} diff --git a/server/lib/friends.js b/server/lib/friends.js index 3f100545c..eafffaab0 100644 --- a/server/lib/friends.js +++ b/server/lib/friends.js @@ -28,7 +28,7 @@ const friends = { } function addVideoToFriends (video) { - createRequest('add', video) + createRequest('add', constants.REQUEST_ENDPOINTS.VIDEOS, video) } function hasFriends (callback) { @@ -119,7 +119,7 @@ function quitFriends (callback) { } function removeVideoToFriends (videoParams) { - createRequest('remove', videoParams) + createRequest('remove', constants.REQUEST_ENDPOINTS.VIDEOS, videoParams) } function sendOwnedVideosToPod (podId) { @@ -137,7 +137,7 @@ function sendOwnedVideosToPod (podId) { return } - createRequest('add', remoteVideo, [ podId ]) + createRequest('add', constants.REQUEST_ENDPOINTS.VIDEOS, remoteVideo, [ podId ]) }) }) }) @@ -250,8 +250,9 @@ function makeRequestsToWinningPods (cert, podsList, callback) { }) } -function createRequest (type, data, to) { +function createRequest (type, endpoint, data, to) { const req = new Request({ + endpoint, request: { type: type, data: data diff --git a/server/models/request.js b/server/models/request.js index 34a4287ea..f5eec2134 100644 --- a/server/models/request.js +++ b/server/models/request.js @@ -2,6 +2,7 @@ const each = require('async/each') const eachLimit = require('async/eachLimit') +const values = require('lodash/values') const mongoose = require('mongoose') const waterfall = require('async/waterfall') @@ -18,7 +19,16 @@ let lastRequestTimestamp = 0 const RequestSchema = mongoose.Schema({ request: mongoose.Schema.Types.Mixed, - to: [ { type: mongoose.Schema.Types.ObjectId, ref: 'Pod' } ] + endpoint: { + type: String, + enum: [ values(constants.REQUEST_ENDPOINTS) ] + }, + to: [ + { + type: mongoose.Schema.Types.ObjectId, + ref: 'Pod' + } + ] }) RequestSchema.statics = { @@ -93,7 +103,7 @@ function remainingMilliSeconds () { // --------------------------------------------------------------------------- // Make a requests to friends of a certain type -function makeRequest (toPod, requestsToMake, callback) { +function makeRequest (toPod, requestEndpoint, requestsToMake, callback) { if (!callback) callback = function () {} const params = { @@ -101,7 +111,7 @@ function makeRequest (toPod, requestsToMake, callback) { encrypt: true, // Security sign: true, // To prove our identity method: 'POST', - path: '/api/' + constants.API_VERSION + '/remote/videos', + path: '/api/' + constants.API_VERSION + '/remote/' + requestEndpoint, data: requestsToMake // Requests we need to make } @@ -144,31 +154,34 @@ function makeRequests () { logger.info('Making requests to friends.') - // Requests by pods id - const requestsToMake = {} + // We want to group requests by destinations pod and endpoint + const requestsToMakeGrouped = {} requests.forEach(function (poolRequest) { poolRequest.to.forEach(function (toPodId) { - if (!requestsToMake[toPodId]) { - requestsToMake[toPodId] = { - ids: [], - datas: [] + const hashKey = toPodId + poolRequest.endpoint + if (!requestsToMakeGrouped[hashKey]) { + requestsToMakeGrouped[hashKey] = { + toPodId, + endpoint: poolRequest.endpoint, + ids: [], // pool request ids, to delete them from the DB in the future + datas: [] // requests data, } } - requestsToMake[toPodId].ids.push(poolRequest._id) - requestsToMake[toPodId].datas.push(poolRequest.request) + requestsToMakeGrouped[hashKey].ids.push(poolRequest._id) + requestsToMakeGrouped[hashKey].datas.push(poolRequest.request) }) }) const goodPods = [] const badPods = [] - eachLimit(Object.keys(requestsToMake), constants.REQUESTS_IN_PARALLEL, function (toPodId, callbackEach) { - const requestToMake = requestsToMake[toPodId] + eachLimit(Object.keys(requestsToMakeGrouped), constants.REQUESTS_IN_PARALLEL, function (hashKey, callbackEach) { + const requestToMake = requestsToMakeGrouped[hashKey] // FIXME: mongodb request inside a loop :/ - Pod.load(toPodId, function (err, toPod) { + Pod.load(requestToMake.toPodId, function (err, toPod) { if (err) { logger.error('Error finding pod by id.', { err: err }) return callbackEach() @@ -176,21 +189,23 @@ function makeRequests () { // Maybe the pod is not our friend anymore so simply remove it if (!toPod) { - logger.info('Removing %d requests of unexisting pod %s.', requestToMake.ids.length, toPodId) - removePodOf.call(self, requestToMake.ids, toPodId) + const requestIdsToDelete = requestToMake.ids + + logger.info('Removing %d requests of unexisting pod %s.', requestIdsToDelete.length, requestToMake.toPodId) + removePodOf.call(self, requestIdsToDelete, requestToMake.toPodId) return callbackEach() } - makeRequest(toPod, requestToMake.datas, function (success) { + makeRequest(toPod, requestToMake.endpoint, requestToMake.datas, function (success) { if (success === true) { - logger.debug('Removing requests for %s pod.', toPodId, { requestsIds: requestToMake.ids }) + logger.debug('Removing requests for %s pod.', requestToMake.toPodId, { requestsIds: requestToMake.ids }) - goodPods.push(toPodId) + goodPods.push(requestToMake.toPodId) // Remove the pod id of these request ids - removePodOf.call(self, requestToMake.ids, toPodId, callbackEach) + removePodOf.call(self, requestToMake.ids, requestToMake.toPodId, callbackEach) } else { - badPods.push(toPodId) + badPods.push(requestToMake.toPodId) callbackEach() } }) @@ -260,7 +275,7 @@ function listWithLimitAndRandom (limit, callback) { let start = Math.floor(Math.random() * count) - limit if (start < 0) start = 0 - self.find({ }, { _id: 1, request: 1, to: 1 }).sort({ _id: 1 }).skip(start).limit(limit).exec(callback) + self.find().sort({ _id: 1 }).skip(start).limit(limit).exec(callback) }) }