From 1c4fa8841f3fb6728df7888a6f74f69e60f83c02 Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Mon, 15 Apr 2024 19:03:51 -0600 Subject: [PATCH 01/27] Initial draft of fetching all replies on context load Signed-off-by: sneakers-the-rat --- app/controllers/api/v1/statuses_controller.rb | 11 +++++++++ .../activitypub/fetch_replies_service.rb | 23 ++++++++++++++----- 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/app/controllers/api/v1/statuses_controller.rb b/app/controllers/api/v1/statuses_controller.rb index 19cc71ae58..dbf7852dfe 100644 --- a/app/controllers/api/v1/statuses_controller.rb +++ b/app/controllers/api/v1/statuses_controller.rb @@ -47,6 +47,17 @@ class Api::V1::StatusesController < Api::BaseController ancestors_limit = ANCESTORS_LIMIT descendants_limit = DESCENDANTS_LIMIT descendants_depth_limit = DESCENDANTS_DEPTH_LIMIT + else + collection = @status['replies'] + + unless collection.nil? && @status.local? + ActivityPub::FetchRepliesService.new.call( + value_or_id(@status), + value_or_id(collection), + allow_synchronous_requests: true, + all_replies: true + ) + end end ancestors_results = @status.in_reply_to_id.nil? ? [] : @status.ancestors(ancestors_limit, current_account) diff --git a/app/services/activitypub/fetch_replies_service.rb b/app/services/activitypub/fetch_replies_service.rb index 46cab6caf9..f2fa1c2596 100644 --- a/app/services/activitypub/fetch_replies_service.rb +++ b/app/services/activitypub/fetch_replies_service.rb @@ -3,7 +3,14 @@ class ActivityPub::FetchRepliesService < BaseService include JsonLdHelper - def call(parent_status, collection_or_uri, allow_synchronous_requests: true, request_id: nil) + # Limit of fetched replies used when not fetching all replies + MAX_REPLIES_LOW = 5 + + def call(parent_status, collection_or_uri, allow_synchronous_requests: true, request_id: nil, all_replies: false) + # Whether we are getting replies from more than the originating server, + # and don't limit ourselves to getting at most `MAX_REPLIES_LOW` + @all_replies = all_replies + @account = parent_status.account @allow_synchronous_requests = allow_synchronous_requests @@ -35,7 +42,7 @@ class ActivityPub::FetchRepliesService < BaseService def fetch_collection(collection_or_uri) return collection_or_uri if collection_or_uri.is_a?(Hash) return unless @allow_synchronous_requests - return if non_matching_uri_hosts?(@account.uri, collection_or_uri) + return if !@all_replies && non_matching_uri_hosts?(@account.uri, collection_or_uri) # NOTE: For backward compatibility reasons, Mastodon signs outgoing # queries incorrectly by default. @@ -54,10 +61,14 @@ class ActivityPub::FetchRepliesService < BaseService end def filtered_replies - # Only fetch replies to the same server as the original status to avoid - # amplification attacks. + if @all_replies + @items.map { |item| value_or_id(item) } + else + # Only fetch replies to the same server as the original status to avoid + # amplification attacks. - # Also limit to 5 fetched replies to limit potential for DoS. - @items.map { |item| value_or_id(item) }.reject { |uri| non_matching_uri_hosts?(@account.uri, uri) }.take(5) + # Also limit to 5 fetched replies to limit potential for DoS. + @items.map { |item| value_or_id(item) }.reject { |uri| non_matching_uri_hosts?(@account.uri, uri) }.take(MAX_REPLIES_LOW) + end end end From 506f54efe91ac07b20db9052f0a07a69f1dc11f3 Mon Sep 17 00:00:00 2001 From: jonny Date: Wed, 8 May 2024 01:55:55 -0700 Subject: [PATCH 02/27] committing all ugly with a bunch of logger calls in the middle but we are almost there baby Signed-off-by: sneakers-the-rat --- app/controllers/api/v1/statuses_controller.rb | 26 +++++++++++++------ .../activitypub/fetch_replies_service.rb | 11 ++++++++ 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/app/controllers/api/v1/statuses_controller.rb b/app/controllers/api/v1/statuses_controller.rb index dbf7852dfe..f008a3fe95 100644 --- a/app/controllers/api/v1/statuses_controller.rb +++ b/app/controllers/api/v1/statuses_controller.rb @@ -2,6 +2,7 @@ class Api::V1::StatusesController < Api::BaseController include Authorization + include JsonLdHelper before_action -> { authorize_if_got_token! :read, :'read:statuses' }, except: [:create, :update, :destroy] before_action -> { doorkeeper_authorize! :write, :'write:statuses' }, only: [:create, :update, :destroy] @@ -48,15 +49,24 @@ class Api::V1::StatusesController < Api::BaseController descendants_limit = DESCENDANTS_LIMIT descendants_depth_limit = DESCENDANTS_DEPTH_LIMIT else - collection = @status['replies'] + unless @status.local? + json_status = fetch_resource(@status.uri, true, @current_account) - unless collection.nil? && @status.local? - ActivityPub::FetchRepliesService.new.call( - value_or_id(@status), - value_or_id(collection), - allow_synchronous_requests: true, - all_replies: true - ) + logger.warn "json status" + logger.warn json_status + # rescue this whole block on failure, don't want to fail the whole context request if we can't do this + collection = json_status['replies'] + logger.warn "replies uri" + logger.warn collection + + unless collection.nil? + ActivityPub::FetchRepliesService.new.call( + @status, + collection, + allow_synchronous_requests: true, + all_replies: true + ) + end end end diff --git a/app/services/activitypub/fetch_replies_service.rb b/app/services/activitypub/fetch_replies_service.rb index f2fa1c2596..c516fe759d 100644 --- a/app/services/activitypub/fetch_replies_service.rb +++ b/app/services/activitypub/fetch_replies_service.rb @@ -15,6 +15,9 @@ class ActivityPub::FetchRepliesService < BaseService @allow_synchronous_requests = allow_synchronous_requests @items = collection_items(collection_or_uri) + logger = Logger.new(STDOUT) + logger.warn 'collection items' + logger.warn @items return if @items.nil? FetchReplyWorker.push_bulk(filtered_replies) { |reply_uri| [reply_uri, { 'request_id' => request_id }] } @@ -25,12 +28,20 @@ class ActivityPub::FetchRepliesService < BaseService private def collection_items(collection_or_uri) + logger = Logger.new(STDOUT) collection = fetch_collection(collection_or_uri) + logger.warn 'first collection' + logger.warn collection return unless collection.is_a?(Hash) collection = fetch_collection(collection['first']) if collection['first'].present? + logger.warn 'second collection' + logger.warn collection return unless collection.is_a?(Hash) + # Need to do another "next" here. see https://neuromatch.social/users/jonny/statuses/112401738180959195/replies for example + # then we are home free (stopping for tonight tho.) + case collection['type'] when 'Collection', 'CollectionPage' as_array(collection['items']) From 24e1b79c1e98ef1ffc72fba12b3d1c35651102c4 Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Wed, 18 Sep 2024 19:42:59 -0700 Subject: [PATCH 03/27] working (i think?) recursive fetch Signed-off-by: sneakers-the-rat --- app/controllers/api/v1/statuses_controller.rb | 28 +++++------ app/lib/activitypub/activity/create.rb | 2 +- app/models/status.rb | 10 ++++ .../activitypub/fetch_replies_service.rb | 46 ++++++++++++------- ...233930_add_fetched_replies_at_to_status.rb | 9 ++++ db/schema.rb | 1 + 6 files changed, 63 insertions(+), 33 deletions(-) create mode 100644 db/migrate/20240918233930_add_fetched_replies_at_to_status.rb diff --git a/app/controllers/api/v1/statuses_controller.rb b/app/controllers/api/v1/statuses_controller.rb index f008a3fe95..a908b10783 100644 --- a/app/controllers/api/v1/statuses_controller.rb +++ b/app/controllers/api/v1/statuses_controller.rb @@ -49,24 +49,20 @@ class Api::V1::StatusesController < Api::BaseController descendants_limit = DESCENDANTS_LIMIT descendants_depth_limit = DESCENDANTS_DEPTH_LIMIT else - unless @status.local? - json_status = fetch_resource(@status.uri, true, @current_account) + unless @status.local? && !@status.should_fetch_replies? + json_status = fetch_resource(@status.uri, true, @current_account) - logger.warn "json status" - logger.warn json_status - # rescue this whole block on failure, don't want to fail the whole context request if we can't do this - collection = json_status['replies'] - logger.warn "replies uri" - logger.warn collection + # rescue this whole block on failure, don't want to fail the whole context request if we can't do this + collection = json_status['replies'] - unless collection.nil? - ActivityPub::FetchRepliesService.new.call( - @status, - collection, - allow_synchronous_requests: true, - all_replies: true - ) - end + unless collection.nil? + ActivityPub::FetchRepliesService.new.call( + @status, + collection, + allow_synchronous_requests: true, + all_replies: true + ) + end end end diff --git a/app/lib/activitypub/activity/create.rb b/app/lib/activitypub/activity/create.rb index 85a66c6852..951f036170 100644 --- a/app/lib/activitypub/activity/create.rb +++ b/app/lib/activitypub/activity/create.rb @@ -328,7 +328,7 @@ class ActivityPub::Activity::Create < ActivityPub::Activity collection = @object['replies'] return if collection.blank? - replies = ActivityPub::FetchRepliesService.new.call(status, collection, allow_synchronous_requests: false, request_id: @options[:request_id]) + replies = ActivityPub::FetchRepliesService.new.call(status, collection, allow_synchronous_requests: false, request_id: @options[:request_id], all_replies: status.should_fetch_replies?) return unless replies.nil? uri = value_or_id(collection) diff --git a/app/models/status.rb b/app/models/status.rb index 5a81b00773..c935d6138d 100644 --- a/app/models/status.rb +++ b/app/models/status.rb @@ -27,6 +27,7 @@ # edited_at :datetime # trendable :boolean # ordered_media_attachment_ids :bigint(8) is an Array +# fetched_replies_at :datetime # class Status < ApplicationRecord @@ -177,6 +178,8 @@ class Status < ApplicationRecord delegate :domain, to: :account, prefix: true REAL_TIME_WINDOW = 6.hours + # debounce fetching all replies to minimize DoS + FETCH_REPLIES_DEBOUNCE = 1.hour def cache_key "v3:#{super}" @@ -411,6 +414,13 @@ class Status < ApplicationRecord end end + def should_fetch_replies? + # we aren't brand new, and we haven't fetched replies since the debounce window + created_at <= 10.minutes.ago && ( + fetched_replies_at.nil? || fetched_replies_at <= FETCH_REPLIES_DEBOUNCE.ago + ) + end + private def update_status_stat!(attrs) diff --git a/app/services/activitypub/fetch_replies_service.rb b/app/services/activitypub/fetch_replies_service.rb index c516fe759d..d7a66456b1 100644 --- a/app/services/activitypub/fetch_replies_service.rb +++ b/app/services/activitypub/fetch_replies_service.rb @@ -5,22 +5,27 @@ class ActivityPub::FetchRepliesService < BaseService # Limit of fetched replies used when not fetching all replies MAX_REPLIES_LOW = 5 + # limit of fetched replies used when fetching all replies + MAX_REPLIES_HIGH = 500 def call(parent_status, collection_or_uri, allow_synchronous_requests: true, request_id: nil, all_replies: false) # Whether we are getting replies from more than the originating server, # and don't limit ourselves to getting at most `MAX_REPLIES_LOW` @all_replies = all_replies + # store the status and whether we should fetch replies for it to avoid + # race conditions if something else updates us in the meantime + @status = parent_status + @should_fetch_replies = parent_status.should_fetch_replies? @account = parent_status.account @allow_synchronous_requests = allow_synchronous_requests @items = collection_items(collection_or_uri) - logger = Logger.new(STDOUT) - logger.warn 'collection items' - logger.warn @items return if @items.nil? FetchReplyWorker.push_bulk(filtered_replies) { |reply_uri| [reply_uri, { 'request_id' => request_id }] } + # Store last fetched all to debounce + @status.update(fetched_replies_at: Time.now.utc) if fetch_all_replies? @items end @@ -28,26 +33,30 @@ class ActivityPub::FetchRepliesService < BaseService private def collection_items(collection_or_uri) - logger = Logger.new(STDOUT) collection = fetch_collection(collection_or_uri) - logger.warn 'first collection' - logger.warn collection return unless collection.is_a?(Hash) collection = fetch_collection(collection['first']) if collection['first'].present? - logger.warn 'second collection' - logger.warn collection return unless collection.is_a?(Hash) - # Need to do another "next" here. see https://neuromatch.social/users/jonny/statuses/112401738180959195/replies for example - # then we are home free (stopping for tonight tho.) + all_items = [] + while collection.is_a?(Hash) + items = case collection['type'] + when 'Collection', 'CollectionPage' + collection['items'] + when 'OrderedCollection', 'OrderedCollectionPage' + collection['orderedItems'] + end - case collection['type'] - when 'Collection', 'CollectionPage' - as_array(collection['items']) - when 'OrderedCollection', 'OrderedCollectionPage' - as_array(collection['orderedItems']) + all_items.concat(as_array(items)) + + # Quit early if we are not fetching all replies + break if all_items.size >= MAX_REPLIES_HIGH || !fetch_all_replies? + + collection = collection['next'].present? ? fetch_collection(collection['next']) : nil end + + all_items end def fetch_collection(collection_or_uri) @@ -73,7 +82,8 @@ class ActivityPub::FetchRepliesService < BaseService def filtered_replies if @all_replies - @items.map { |item| value_or_id(item) } + # Reject all statuses that we already have in the db + @items.map { |item| value_or_id(item) }.reject { |uri| Status.exists?(uri: uri) } else # Only fetch replies to the same server as the original status to avoid # amplification attacks. @@ -82,4 +92,8 @@ class ActivityPub::FetchRepliesService < BaseService @items.map { |item| value_or_id(item) }.reject { |uri| non_matching_uri_hosts?(@account.uri, uri) }.take(MAX_REPLIES_LOW) end end + + def fetch_all_replies? + @all_replies && @should_fetch_replies + end end diff --git a/db/migrate/20240918233930_add_fetched_replies_at_to_status.rb b/db/migrate/20240918233930_add_fetched_replies_at_to_status.rb new file mode 100644 index 0000000000..c42eff6aeb --- /dev/null +++ b/db/migrate/20240918233930_add_fetched_replies_at_to_status.rb @@ -0,0 +1,9 @@ +# frozen_string_literal: true + +class AddFetchedRepliesAtToStatus < ActiveRecord::Migration[7.1] + disable_ddl_transaction! + + def change + add_column :statuses, :fetched_replies_at, :datetime, null: true + end +end diff --git a/db/schema.rb b/db/schema.rb index d28bc5efca..224d052200 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -1052,6 +1052,7 @@ ActiveRecord::Schema[7.2].define(version: 2024_12_05_163118) do t.datetime "edited_at", precision: nil t.boolean "trendable" t.bigint "ordered_media_attachment_ids", array: true + t.datetime "fetched_replies_at" t.index ["account_id", "id", "visibility", "updated_at"], name: "index_statuses_20190820", order: { id: :desc }, where: "(deleted_at IS NULL)" t.index ["account_id"], name: "index_statuses_on_account_id" t.index ["deleted_at"], name: "index_statuses_on_deleted_at", where: "(deleted_at IS NOT NULL)" From d29497dbe2a770d5677389251f4bb3ee5937e3b9 Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Wed, 18 Sep 2024 20:35:36 -0700 Subject: [PATCH 04/27] don't do it for every create, only do recursive reply expansion when requested from context endpoint, but async Signed-off-by: sneakers-the-rat --- app/controllers/api/v1/statuses_controller.rb | 2 +- app/lib/activitypub/activity/create.rb | 2 +- app/models/status.rb | 2 +- .../activitypub/fetch_replies_service.rb | 2 +- app/workers/fetch_reply_worker.rb | 23 ++++++++++++++-- db/schema.rb | 26 +++++++++---------- 6 files changed, 38 insertions(+), 19 deletions(-) diff --git a/app/controllers/api/v1/statuses_controller.rb b/app/controllers/api/v1/statuses_controller.rb index a908b10783..1bf7ddf668 100644 --- a/app/controllers/api/v1/statuses_controller.rb +++ b/app/controllers/api/v1/statuses_controller.rb @@ -50,7 +50,7 @@ class Api::V1::StatusesController < Api::BaseController descendants_depth_limit = DESCENDANTS_DEPTH_LIMIT else unless @status.local? && !@status.should_fetch_replies? - json_status = fetch_resource(@status.uri, true, @current_account) + json_status = fetch_resource(@status.uri, true, current_account) # rescue this whole block on failure, don't want to fail the whole context request if we can't do this collection = json_status['replies'] diff --git a/app/lib/activitypub/activity/create.rb b/app/lib/activitypub/activity/create.rb index 951f036170..85a66c6852 100644 --- a/app/lib/activitypub/activity/create.rb +++ b/app/lib/activitypub/activity/create.rb @@ -328,7 +328,7 @@ class ActivityPub::Activity::Create < ActivityPub::Activity collection = @object['replies'] return if collection.blank? - replies = ActivityPub::FetchRepliesService.new.call(status, collection, allow_synchronous_requests: false, request_id: @options[:request_id], all_replies: status.should_fetch_replies?) + replies = ActivityPub::FetchRepliesService.new.call(status, collection, allow_synchronous_requests: false, request_id: @options[:request_id]) return unless replies.nil? uri = value_or_id(collection) diff --git a/app/models/status.rb b/app/models/status.rb index c935d6138d..aaa76e7f98 100644 --- a/app/models/status.rb +++ b/app/models/status.rb @@ -416,7 +416,7 @@ class Status < ApplicationRecord def should_fetch_replies? # we aren't brand new, and we haven't fetched replies since the debounce window - created_at <= 10.minutes.ago && ( + !local? && created_at <= 10.minutes.ago && ( fetched_replies_at.nil? || fetched_replies_at <= FETCH_REPLIES_DEBOUNCE.ago ) end diff --git a/app/services/activitypub/fetch_replies_service.rb b/app/services/activitypub/fetch_replies_service.rb index d7a66456b1..c1517837b2 100644 --- a/app/services/activitypub/fetch_replies_service.rb +++ b/app/services/activitypub/fetch_replies_service.rb @@ -23,7 +23,7 @@ class ActivityPub::FetchRepliesService < BaseService @items = collection_items(collection_or_uri) return if @items.nil? - FetchReplyWorker.push_bulk(filtered_replies) { |reply_uri| [reply_uri, { 'request_id' => request_id }] } + FetchReplyWorker.push_bulk(filtered_replies) { |reply_uri| [reply_uri, { 'request_id' => request_id }, @all_replies] } # Store last fetched all to debounce @status.update(fetched_replies_at: Time.now.utc) if fetch_all_replies? diff --git a/app/workers/fetch_reply_worker.rb b/app/workers/fetch_reply_worker.rb index 68a7414beb..af33e8d681 100644 --- a/app/workers/fetch_reply_worker.rb +++ b/app/workers/fetch_reply_worker.rb @@ -6,7 +6,26 @@ class FetchReplyWorker sidekiq_options queue: 'pull', retry: 3 - def perform(child_url, options = {}) - FetchRemoteStatusService.new.call(child_url, **options.deep_symbolize_keys) + def perform(child_url, options = {}, all_replies: false) + status = FetchRemoteStatusService.new.call(child_url, **options.deep_symbolize_keys) + + # asked to fetch replies recursively - do the second-level calls async + if all_replies && status.should_fetch_replies? + json_status = fetch_resource(status.uri, true) + + collection = json_status['replies'] + unless collection.nil? + # if expanding replies recursively, spread out the recursive calls + ActivityPub::FetchRepliesWorker.perform_in( + rand(1..30).seconds, + status.id, + collection, + { + allow_synchronous_requests: true, + all_replies: true, + } + ) + end + end end end diff --git a/db/schema.rb b/db/schema.rb index 224d052200..c1fbc3431b 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -556,12 +556,12 @@ ActiveRecord::Schema[7.2].define(version: 2024_12_05_163118) do end create_table "ip_blocks", force: :cascade do |t| - t.inet "ip", default: "0.0.0.0", null: false - t.integer "severity", default: 0, null: false - t.datetime "expires_at", precision: nil - t.text "comment", default: "", null: false t.datetime "created_at", precision: nil, null: false t.datetime "updated_at", precision: nil, null: false + t.datetime "expires_at", precision: nil + t.inet "ip", default: "0.0.0.0", null: false + t.integer "severity", default: 0, null: false + t.text "comment", default: "", null: false t.index ["ip"], name: "index_ip_blocks_on_ip", unique: true end @@ -1398,9 +1398,9 @@ ActiveRecord::Schema[7.2].define(version: 2024_12_05_163118) do add_index "instances", ["domain"], name: "index_instances_on_domain", unique: true create_view "user_ips", sql_definition: <<-SQL - SELECT user_id, - ip, - max(used_at) AS used_at + SELECT t0.user_id, + t0.ip, + max(t0.used_at) AS used_at FROM ( SELECT users.id AS user_id, users.sign_up_ip AS ip, users.created_at AS used_at @@ -1417,7 +1417,7 @@ ActiveRecord::Schema[7.2].define(version: 2024_12_05_163118) do login_activities.created_at FROM login_activities WHERE (login_activities.success = true)) t0 - GROUP BY user_id, ip; + GROUP BY t0.user_id, t0.ip; SQL create_view "account_summaries", materialized: true, sql_definition: <<-SQL SELECT accounts.id AS account_id, @@ -1438,9 +1438,9 @@ ActiveRecord::Schema[7.2].define(version: 2024_12_05_163118) do add_index "account_summaries", ["account_id"], name: "index_account_summaries_on_account_id", unique: true create_view "global_follow_recommendations", materialized: true, sql_definition: <<-SQL - SELECT account_id, - sum(rank) AS rank, - array_agg(reason) AS reason + SELECT t0.account_id, + sum(t0.rank) AS rank, + array_agg(t0.reason) AS reason FROM ( SELECT account_summaries.account_id, ((count(follows.id))::numeric / (1.0 + (count(follows.id))::numeric)) AS rank, 'most_followed'::text AS reason @@ -1464,8 +1464,8 @@ ActiveRecord::Schema[7.2].define(version: 2024_12_05_163118) do WHERE (follow_recommendation_suppressions.account_id = statuses.account_id))))) GROUP BY account_summaries.account_id HAVING (sum((status_stats.reblogs_count + status_stats.favourites_count)) >= (5)::numeric)) t0 - GROUP BY account_id - ORDER BY (sum(rank)) DESC; + GROUP BY t0.account_id + ORDER BY (sum(t0.rank)) DESC; SQL add_index "global_follow_recommendations", ["account_id"], name: "index_global_follow_recommendations_on_account_id", unique: true From aa7b197dc677b89fa03a6f72892e4aa60a58fa67 Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Wed, 18 Sep 2024 21:07:28 -0700 Subject: [PATCH 05/27] correct number of args to replies worker, recursive fetching is working Signed-off-by: sneakers-the-rat --- app/models/status.rb | 2 +- app/services/activitypub/fetch_replies_service.rb | 2 +- app/workers/fetch_reply_worker.rb | 7 +++++-- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/app/models/status.rb b/app/models/status.rb index aaa76e7f98..5a6a139fb5 100644 --- a/app/models/status.rb +++ b/app/models/status.rb @@ -179,7 +179,7 @@ class Status < ApplicationRecord REAL_TIME_WINDOW = 6.hours # debounce fetching all replies to minimize DoS - FETCH_REPLIES_DEBOUNCE = 1.hour + FETCH_REPLIES_DEBOUNCE = 30.minutes def cache_key "v3:#{super}" diff --git a/app/services/activitypub/fetch_replies_service.rb b/app/services/activitypub/fetch_replies_service.rb index c1517837b2..18b3d2eddc 100644 --- a/app/services/activitypub/fetch_replies_service.rb +++ b/app/services/activitypub/fetch_replies_service.rb @@ -23,7 +23,7 @@ class ActivityPub::FetchRepliesService < BaseService @items = collection_items(collection_or_uri) return if @items.nil? - FetchReplyWorker.push_bulk(filtered_replies) { |reply_uri| [reply_uri, { 'request_id' => request_id }, @all_replies] } + FetchReplyWorker.push_bulk(filtered_replies) { |reply_uri| [reply_uri, { 'request_id' => request_id, 'all_replies' => @all_replies }] } # Store last fetched all to debounce @status.update(fetched_replies_at: Time.now.utc) if fetch_all_replies? diff --git a/app/workers/fetch_reply_worker.rb b/app/workers/fetch_reply_worker.rb index af33e8d681..280bc8406b 100644 --- a/app/workers/fetch_reply_worker.rb +++ b/app/workers/fetch_reply_worker.rb @@ -3,14 +3,17 @@ class FetchReplyWorker include Sidekiq::Worker include ExponentialBackoff + include JsonLdHelper sidekiq_options queue: 'pull', retry: 3 - def perform(child_url, options = {}, all_replies: false) + def perform(child_url, options = {}) + all_replies = options.delete('all_replies') + status = FetchRemoteStatusService.new.call(child_url, **options.deep_symbolize_keys) # asked to fetch replies recursively - do the second-level calls async - if all_replies && status.should_fetch_replies? + if all_replies && status json_status = fetch_resource(status.uri, true) collection = json_status['replies'] From 5e0b959660bc80cf3ddbd4ae35b53361a99b2066 Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Wed, 18 Sep 2024 21:49:57 -0700 Subject: [PATCH 06/27] accept review comments https://github.com/NeuromatchAcademy/mastodon/pull/44\#discussion_r1766143286 and https://github.com/NeuromatchAcademy/mastodon/pull/44\#discussion_r1766148179 Signed-off-by: sneakers-the-rat --- app/services/activitypub/fetch_replies_service.rb | 2 +- db/migrate/20240918233930_add_fetched_replies_at_to_status.rb | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/app/services/activitypub/fetch_replies_service.rb b/app/services/activitypub/fetch_replies_service.rb index 18b3d2eddc..c09d224263 100644 --- a/app/services/activitypub/fetch_replies_service.rb +++ b/app/services/activitypub/fetch_replies_service.rb @@ -25,7 +25,7 @@ class ActivityPub::FetchRepliesService < BaseService FetchReplyWorker.push_bulk(filtered_replies) { |reply_uri| [reply_uri, { 'request_id' => request_id, 'all_replies' => @all_replies }] } # Store last fetched all to debounce - @status.update(fetched_replies_at: Time.now.utc) if fetch_all_replies? + @status.touch(:fetched_replies_at) @items end diff --git a/db/migrate/20240918233930_add_fetched_replies_at_to_status.rb b/db/migrate/20240918233930_add_fetched_replies_at_to_status.rb index c42eff6aeb..229e43d978 100644 --- a/db/migrate/20240918233930_add_fetched_replies_at_to_status.rb +++ b/db/migrate/20240918233930_add_fetched_replies_at_to_status.rb @@ -1,8 +1,6 @@ # frozen_string_literal: true class AddFetchedRepliesAtToStatus < ActiveRecord::Migration[7.1] - disable_ddl_transaction! - def change add_column :statuses, :fetched_replies_at, :datetime, null: true end From f402fbcbdef4e075a24632e5287e0e14eb8536d3 Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Sun, 29 Sep 2024 23:24:04 -0700 Subject: [PATCH 07/27] Remove recursion, separate out into separate workers/services, add limit to global maximum statuses fetched (untested, this might not work yet) Signed-off-by: sneakers-the-rat --- app/controllers/api/v1/statuses_controller.rb | 16 +++++-- .../concerns/status/fetch_replies_concern.rb | 27 +++++++++++ app/models/status.rb | 10 +---- .../activitypub/fetch_all_replies_service.rb | 37 +++++++++++++++ .../activitypub/fetch_replies_service.rb | 34 +++++--------- .../activitypub/fetch_all_replies_worker.rb | 45 +++++++++++++++++++ app/workers/fetch_reply_worker.rb | 23 +--------- 7 files changed, 134 insertions(+), 58 deletions(-) create mode 100644 app/models/concerns/status/fetch_replies_concern.rb create mode 100644 app/services/activitypub/fetch_all_replies_service.rb create mode 100644 app/workers/activitypub/fetch_all_replies_worker.rb diff --git a/app/controllers/api/v1/statuses_controller.rb b/app/controllers/api/v1/statuses_controller.rb index 1bf7ddf668..f53dd42077 100644 --- a/app/controllers/api/v1/statuses_controller.rb +++ b/app/controllers/api/v1/statuses_controller.rb @@ -66,15 +66,25 @@ class Api::V1::StatusesController < Api::BaseController end end - ancestors_results = @status.in_reply_to_id.nil? ? [] : @status.ancestors(ancestors_limit, current_account) + ancestors_results = @status.in_reply_to_id.nil? ? [] : @status.ancestors(ancestors_limit, current_account) descendants_results = @status.descendants(descendants_limit, current_account, descendants_depth_limit) - loaded_ancestors = preload_collection(ancestors_results, Status) - loaded_descendants = preload_collection(descendants_results, Status) + loaded_ancestors = preload_collection(ancestors_results, Status) + loaded_descendants = preload_collection(descendants_results, Status) @context = Context.new(ancestors: loaded_ancestors, descendants: loaded_descendants) statuses = [@status] + @context.ancestors + @context.descendants render json: @context, serializer: REST::ContextSerializer, relationships: StatusRelationshipsPresenter.new(statuses, current_user&.account_id) + + if @status.should_fetch_replies? + ActivityPub::FetchAllRepliesWorker.perform_async( + @status.id, + { + allow_synchronous_requests: true, + current_account_id: current_account.id, + } + ) + end end def create diff --git a/app/models/concerns/status/fetch_replies_concern.rb b/app/models/concerns/status/fetch_replies_concern.rb new file mode 100644 index 0000000000..10ddb71f09 --- /dev/null +++ b/app/models/concerns/status/fetch_replies_concern.rb @@ -0,0 +1,27 @@ +# frozen_string_literal: true + +module Status::FetchRepliesConcern + extend ActiveSupport::Concern + + # debounce fetching all replies to minimize DoS + FETCH_REPLIES_DEBOUNCE = 30.minutes + + CREATED_RECENTLY_DEBOUNCE = 10.minutes + + included do + scope :created_recently, -> { where(created_at: CREATED_RECENTLY_DEBOUNCE.ago..) } + scope :not_created_recently, -> { where(created_at: ..CREATED_RECENTLY_DEBOUNCE.ago) } + scope :fetched_recently, -> { where(fetched_replies_at: FETCH_REPLIES_DEBOUNCE.ago..) } + scope :not_fetched_recently, -> { where(fetched_replies_at: ..FETCH_REPLIES_DEBOUNCE.ago).or(where(fetched_replies_at: nil)) } + + scope :shouldnt_fetch_replies, -> { local.merge(created_recently).merge(fetched_recently) } + scope :should_fetch_replies, -> { local.invert_where.merge(not_created_recently).merge(not_fetched_recently) } + end + + def should_fetch_replies? + # we aren't brand new, and we haven't fetched replies since the debounce window + !local? && created_at <= 10.minutes.ago && ( + fetched_replies_at.nil? || fetched_replies_at <= FETCH_REPLIES_DEBOUNCE.ago + ) + end +end diff --git a/app/models/status.rb b/app/models/status.rb index 5a6a139fb5..e5d4632fc9 100644 --- a/app/models/status.rb +++ b/app/models/status.rb @@ -35,6 +35,7 @@ class Status < ApplicationRecord include Discard::Model include Paginable include RateLimitable + include Status::FetchRepliesConcern include Status::SafeReblogInsert include Status::SearchConcern include Status::SnapshotConcern @@ -178,8 +179,6 @@ class Status < ApplicationRecord delegate :domain, to: :account, prefix: true REAL_TIME_WINDOW = 6.hours - # debounce fetching all replies to minimize DoS - FETCH_REPLIES_DEBOUNCE = 30.minutes def cache_key "v3:#{super}" @@ -414,13 +413,6 @@ class Status < ApplicationRecord end end - def should_fetch_replies? - # we aren't brand new, and we haven't fetched replies since the debounce window - !local? && created_at <= 10.minutes.ago && ( - fetched_replies_at.nil? || fetched_replies_at <= FETCH_REPLIES_DEBOUNCE.ago - ) - end - private def update_status_stat!(attrs) diff --git a/app/services/activitypub/fetch_all_replies_service.rb b/app/services/activitypub/fetch_all_replies_service.rb new file mode 100644 index 0000000000..19fae824df --- /dev/null +++ b/app/services/activitypub/fetch_all_replies_service.rb @@ -0,0 +1,37 @@ +# frozen_string_literal: true + +class ActivityPub::FetchAllRepliesService < ActivityPub::FetchRepliesService + include JsonLdHelper + + # Limit of replies to fetch per status + MAX_REPLIES = 500 + + def call(collection_or_uri, allow_synchronous_requests: true, request_id: nil) + @allow_synchronous_requests = allow_synchronous_requests + @filter_by_host = false + + @items = collection_items(collection_or_uri) + @items = filtered_replies + return if @items.nil? + + FetchReplyWorker.push_bulk(@items) { |reply_uri| [reply_uri, { 'request_id' => request_id }] } + + @items + end + + private + + def filtered_replies + return if @items.nil? + + # find all statuses that we *shouldn't* update the replies for, and use that as a filter + uris = @items.map { |item| value_or_id(item) } + dont_update = Status.where(uri: uris).shouldnt_fetch_replies.pluck(:uri) + + # touch all statuses that already exist and that we're about to update + Status.where(uri: uris).should_fetch_replies.touch_all(:fetched_replies_at) + + # Reject all statuses that we already have in the db + uris.reject { |uri| dont_update.include?(uri) }.take(MAX_REPLIES) + end +end diff --git a/app/services/activitypub/fetch_replies_service.rb b/app/services/activitypub/fetch_replies_service.rb index c09d224263..dcaa9385bb 100644 --- a/app/services/activitypub/fetch_replies_service.rb +++ b/app/services/activitypub/fetch_replies_service.rb @@ -3,22 +3,13 @@ class ActivityPub::FetchRepliesService < BaseService include JsonLdHelper - # Limit of fetched replies used when not fetching all replies - MAX_REPLIES_LOW = 5 - # limit of fetched replies used when fetching all replies - MAX_REPLIES_HIGH = 500 - - def call(parent_status, collection_or_uri, allow_synchronous_requests: true, request_id: nil, all_replies: false) - # Whether we are getting replies from more than the originating server, - # and don't limit ourselves to getting at most `MAX_REPLIES_LOW` - @all_replies = all_replies - # store the status and whether we should fetch replies for it to avoid - # race conditions if something else updates us in the meantime - @status = parent_status - @should_fetch_replies = parent_status.should_fetch_replies? + # Limit of fetched replies + MAX_REPLIES = 5 + def call(parent_status, collection_or_uri, allow_synchronous_requests: true, request_id: nil, filter_by_host: true) @account = parent_status.account @allow_synchronous_requests = allow_synchronous_requests + @filter_by_host = filter_by_host @items = collection_items(collection_or_uri) return if @items.nil? @@ -51,7 +42,7 @@ class ActivityPub::FetchRepliesService < BaseService all_items.concat(as_array(items)) # Quit early if we are not fetching all replies - break if all_items.size >= MAX_REPLIES_HIGH || !fetch_all_replies? + break if all_items.size >= MAX_REPLIES collection = collection['next'].present? ? fetch_collection(collection['next']) : nil end @@ -62,7 +53,7 @@ class ActivityPub::FetchRepliesService < BaseService def fetch_collection(collection_or_uri) return collection_or_uri if collection_or_uri.is_a?(Hash) return unless @allow_synchronous_requests - return if !@all_replies && non_matching_uri_hosts?(@account.uri, collection_or_uri) + return if @filter_by_host && non_matching_uri_hosts?(@account.uri, collection_or_uri) # NOTE: For backward compatibility reasons, Mastodon signs outgoing # queries incorrectly by default. @@ -81,19 +72,14 @@ class ActivityPub::FetchRepliesService < BaseService end def filtered_replies - if @all_replies - # Reject all statuses that we already have in the db - @items.map { |item| value_or_id(item) }.reject { |uri| Status.exists?(uri: uri) } - else + if @filter_by_host # Only fetch replies to the same server as the original status to avoid # amplification attacks. # Also limit to 5 fetched replies to limit potential for DoS. - @items.map { |item| value_or_id(item) }.reject { |uri| non_matching_uri_hosts?(@account.uri, uri) }.take(MAX_REPLIES_LOW) + @items.map { |item| value_or_id(item) }.reject { |uri| non_matching_uri_hosts?(@account.uri, uri) }.take(MAX_REPLIES) + else + @items.map { |item| value_or_id(item) }.take(MAX_REPLIES) end end - - def fetch_all_replies? - @all_replies && @should_fetch_replies - end end diff --git a/app/workers/activitypub/fetch_all_replies_worker.rb b/app/workers/activitypub/fetch_all_replies_worker.rb new file mode 100644 index 0000000000..882faf7dca --- /dev/null +++ b/app/workers/activitypub/fetch_all_replies_worker.rb @@ -0,0 +1,45 @@ +# frozen_string_literal: true + +# Fetch all replies to a status, querying recursively through +# ActivityPub replies collections, fetching any statuses that +# we either don't already have or we haven't checked for new replies +# in the Status::FETCH_REPLIES_DEBOUNCE interval +class ActivityPub::FetchAllRepliesWorker + include Sidekiq::Worker + include ExponentialBackoff + + sidekiq_options queue: 'pull', retry: 3 + + # Global max replies to fetch per request + MAX_REPLIES = 1000 + + def perform(parent_status_id, options = {}) + @parent_status = Status.find(parent_status_id) + @current_account_id = options.fetch(:current_account_id, nil) + @current_account = @current_account_id.nil? ? nil : Account.find(id: @current_account_id) + + all_replies = get_replies(@parent_status.uri) + got_replies = all_replies.length + until all_replies.empty? || got_replies >= MAX_REPLIES + new_replies = get_replies(all_replies.pop) + got_replies += new_replies.length + all_replies << new_replies + end + end + + private + + def get_replies(status_uri) + replies_uri = get_replies_uri(status_uri) + return if replies_uri.nil? + + ActivityPub::FetchAllRepliesService.new.call(replies_uri, **options.deep_symbolize_keys) + end + + def get_replies_uri(parent_status_uri) + json_status = fetch_resource(parent_status_uri, true, @current_account) + replies_uri = json_status['replies'] + Rails.logger.debug { "Could not find replies uri for status URI: #{parent_status_uri}" } if replies_uri.nil? + replies_uri + end +end diff --git a/app/workers/fetch_reply_worker.rb b/app/workers/fetch_reply_worker.rb index 280bc8406b..08739babf0 100644 --- a/app/workers/fetch_reply_worker.rb +++ b/app/workers/fetch_reply_worker.rb @@ -8,27 +8,6 @@ class FetchReplyWorker sidekiq_options queue: 'pull', retry: 3 def perform(child_url, options = {}) - all_replies = options.delete('all_replies') - - status = FetchRemoteStatusService.new.call(child_url, **options.deep_symbolize_keys) - - # asked to fetch replies recursively - do the second-level calls async - if all_replies && status - json_status = fetch_resource(status.uri, true) - - collection = json_status['replies'] - unless collection.nil? - # if expanding replies recursively, spread out the recursive calls - ActivityPub::FetchRepliesWorker.perform_in( - rand(1..30).seconds, - status.id, - collection, - { - allow_synchronous_requests: true, - all_replies: true, - } - ) - end - end + FetchRemoteStatusService.new.call(child_url, **options.deep_symbolize_keys) end end From 6ddc66a974f3c6892d6be7dbf35058f62200e8d0 Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Sun, 29 Sep 2024 23:29:52 -0700 Subject: [PATCH 08/27] rm redundant request to fetch replies worker in controller Signed-off-by: sneakers-the-rat --- app/controllers/api/v1/statuses_controller.rb | 18 +----------------- 1 file changed, 1 insertion(+), 17 deletions(-) diff --git a/app/controllers/api/v1/statuses_controller.rb b/app/controllers/api/v1/statuses_controller.rb index f53dd42077..72cb12853f 100644 --- a/app/controllers/api/v1/statuses_controller.rb +++ b/app/controllers/api/v1/statuses_controller.rb @@ -48,22 +48,6 @@ class Api::V1::StatusesController < Api::BaseController ancestors_limit = ANCESTORS_LIMIT descendants_limit = DESCENDANTS_LIMIT descendants_depth_limit = DESCENDANTS_DEPTH_LIMIT - else - unless @status.local? && !@status.should_fetch_replies? - json_status = fetch_resource(@status.uri, true, current_account) - - # rescue this whole block on failure, don't want to fail the whole context request if we can't do this - collection = json_status['replies'] - - unless collection.nil? - ActivityPub::FetchRepliesService.new.call( - @status, - collection, - allow_synchronous_requests: true, - all_replies: true - ) - end - end end ancestors_results = @status.in_reply_to_id.nil? ? [] : @status.ancestors(ancestors_limit, current_account) @@ -76,7 +60,7 @@ class Api::V1::StatusesController < Api::BaseController render json: @context, serializer: REST::ContextSerializer, relationships: StatusRelationshipsPresenter.new(statuses, current_user&.account_id) - if @status.should_fetch_replies? + if !current_account.nil? && @status.should_fetch_replies? ActivityPub::FetchAllRepliesWorker.perform_async( @status.id, { From 7023ee85e3468c82a889d0637559a34cb3098a31 Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Sun, 29 Sep 2024 23:32:25 -0700 Subject: [PATCH 09/27] rm zombie code in fetch_replies_service Signed-off-by: sneakers-the-rat --- app/services/activitypub/fetch_replies_service.rb | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/app/services/activitypub/fetch_replies_service.rb b/app/services/activitypub/fetch_replies_service.rb index dcaa9385bb..0dcdb638e1 100644 --- a/app/services/activitypub/fetch_replies_service.rb +++ b/app/services/activitypub/fetch_replies_service.rb @@ -14,9 +14,7 @@ class ActivityPub::FetchRepliesService < BaseService @items = collection_items(collection_or_uri) return if @items.nil? - FetchReplyWorker.push_bulk(filtered_replies) { |reply_uri| [reply_uri, { 'request_id' => request_id, 'all_replies' => @all_replies }] } - # Store last fetched all to debounce - @status.touch(:fetched_replies_at) + FetchReplyWorker.push_bulk(filtered_replies) { |reply_uri| [reply_uri, { 'request_id' => request_id }] } @items end From 6600f28fea453eb820223cc56feb59a1785f5a7d Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Sun, 29 Sep 2024 23:37:47 -0700 Subject: [PATCH 10/27] rm spurious imports and reformatting Signed-off-by: sneakers-the-rat --- app/controllers/api/v1/statuses_controller.rb | 11 +++++------ app/workers/fetch_reply_worker.rb | 1 - 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/app/controllers/api/v1/statuses_controller.rb b/app/controllers/api/v1/statuses_controller.rb index 72cb12853f..d9ca69d534 100644 --- a/app/controllers/api/v1/statuses_controller.rb +++ b/app/controllers/api/v1/statuses_controller.rb @@ -2,7 +2,6 @@ class Api::V1::StatusesController < Api::BaseController include Authorization - include JsonLdHelper before_action -> { authorize_if_got_token! :read, :'read:statuses' }, except: [:create, :update, :destroy] before_action -> { doorkeeper_authorize! :write, :'write:statuses' }, only: [:create, :update, :destroy] @@ -45,15 +44,15 @@ class Api::V1::StatusesController < Api::BaseController descendants_depth_limit = nil if current_account.nil? - ancestors_limit = ANCESTORS_LIMIT - descendants_limit = DESCENDANTS_LIMIT + ancestors_limit = ANCESTORS_LIMIT + descendants_limit = DESCENDANTS_LIMIT descendants_depth_limit = DESCENDANTS_DEPTH_LIMIT end - ancestors_results = @status.in_reply_to_id.nil? ? [] : @status.ancestors(ancestors_limit, current_account) + ancestors_results = @status.in_reply_to_id.nil? ? [] : @status.ancestors(ancestors_limit, current_account) descendants_results = @status.descendants(descendants_limit, current_account, descendants_depth_limit) - loaded_ancestors = preload_collection(ancestors_results, Status) - loaded_descendants = preload_collection(descendants_results, Status) + loaded_ancestors = preload_collection(ancestors_results, Status) + loaded_descendants = preload_collection(descendants_results, Status) @context = Context.new(ancestors: loaded_ancestors, descendants: loaded_descendants) statuses = [@status] + @context.ancestors + @context.descendants diff --git a/app/workers/fetch_reply_worker.rb b/app/workers/fetch_reply_worker.rb index 08739babf0..68a7414beb 100644 --- a/app/workers/fetch_reply_worker.rb +++ b/app/workers/fetch_reply_worker.rb @@ -3,7 +3,6 @@ class FetchReplyWorker include Sidekiq::Worker include ExponentialBackoff - include JsonLdHelper sidekiq_options queue: 'pull', retry: 3 From aec98b8b4a5ad9d9a023c282a1eb1ec791a47e62 Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Sun, 29 Sep 2024 23:38:23 -0700 Subject: [PATCH 11/27] rm more spurious formatting Signed-off-by: sneakers-the-rat --- app/controllers/api/v1/statuses_controller.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/controllers/api/v1/statuses_controller.rb b/app/controllers/api/v1/statuses_controller.rb index d9ca69d534..4d345566be 100644 --- a/app/controllers/api/v1/statuses_controller.rb +++ b/app/controllers/api/v1/statuses_controller.rb @@ -44,8 +44,8 @@ class Api::V1::StatusesController < Api::BaseController descendants_depth_limit = nil if current_account.nil? - ancestors_limit = ANCESTORS_LIMIT - descendants_limit = DESCENDANTS_LIMIT + ancestors_limit = ANCESTORS_LIMIT + descendants_limit = DESCENDANTS_LIMIT descendants_depth_limit = DESCENDANTS_DEPTH_LIMIT end From f803230ab3a4917fb60591cc46a990f3c8e44a46 Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Sat, 12 Oct 2024 21:02:31 -0700 Subject: [PATCH 12/27] Working version of fetch all replies service with global maximum on fetching Signed-off-by: sneakers-the-rat --- app/controllers/api/v1/statuses_controller.rb | 2 +- .../concerns/status/fetch_replies_concern.rb | 2 +- .../activitypub/fetch_all_replies_worker.rb | 37 +++++++++++++------ 3 files changed, 28 insertions(+), 13 deletions(-) diff --git a/app/controllers/api/v1/statuses_controller.rb b/app/controllers/api/v1/statuses_controller.rb index 4d345566be..f131eaed09 100644 --- a/app/controllers/api/v1/statuses_controller.rb +++ b/app/controllers/api/v1/statuses_controller.rb @@ -62,9 +62,9 @@ class Api::V1::StatusesController < Api::BaseController if !current_account.nil? && @status.should_fetch_replies? ActivityPub::FetchAllRepliesWorker.perform_async( @status.id, + current_account.id, { allow_synchronous_requests: true, - current_account_id: current_account.id, } ) end diff --git a/app/models/concerns/status/fetch_replies_concern.rb b/app/models/concerns/status/fetch_replies_concern.rb index 10ddb71f09..696e93ec88 100644 --- a/app/models/concerns/status/fetch_replies_concern.rb +++ b/app/models/concerns/status/fetch_replies_concern.rb @@ -20,7 +20,7 @@ module Status::FetchRepliesConcern def should_fetch_replies? # we aren't brand new, and we haven't fetched replies since the debounce window - !local? && created_at <= 10.minutes.ago && ( + !local? && created_at <= CREATED_RECENTLY_DEBOUNCE.ago && ( fetched_replies_at.nil? || fetched_replies_at <= FETCH_REPLIES_DEBOUNCE.ago ) end diff --git a/app/workers/activitypub/fetch_all_replies_worker.rb b/app/workers/activitypub/fetch_all_replies_worker.rb index 882faf7dca..af2cf40e91 100644 --- a/app/workers/activitypub/fetch_all_replies_worker.rb +++ b/app/workers/activitypub/fetch_all_replies_worker.rb @@ -7,29 +7,39 @@ class ActivityPub::FetchAllRepliesWorker include Sidekiq::Worker include ExponentialBackoff + include JsonLdHelper sidekiq_options queue: 'pull', retry: 3 # Global max replies to fetch per request MAX_REPLIES = 1000 - def perform(parent_status_id, options = {}) + def perform(parent_status_id, current_account_id = nil, options = {}) @parent_status = Status.find(parent_status_id) - @current_account_id = options.fetch(:current_account_id, nil) - @current_account = @current_account_id.nil? ? nil : Account.find(id: @current_account_id) + @current_account_id = current_account_id + @current_account = @current_account_id.nil? ? nil : Account.find(@current_account_id) + Rails.logger.debug { "FetchAllRepliesWorker - #{parent_status_id}: Fetching all replies for status: #{@parent_status}" } - all_replies = get_replies(@parent_status.uri) + all_replies = get_replies(@parent_status.uri, options) got_replies = all_replies.length until all_replies.empty? || got_replies >= MAX_REPLIES - new_replies = get_replies(all_replies.pop) + next_reply = all_replies.pop + next if next_reply.nil? + + new_replies = get_replies(next_reply, options) + next if new_replies.nil? + got_replies += new_replies.length - all_replies << new_replies + all_replies.concat(new_replies) end + + Rails.logger.debug { "FetchAllRepliesWorker - #{parent_status_id}: fetched #{got_replies} replies" } + got_replies end private - def get_replies(status_uri) + def get_replies(status_uri, options = {}) replies_uri = get_replies_uri(status_uri) return if replies_uri.nil? @@ -37,9 +47,14 @@ class ActivityPub::FetchAllRepliesWorker end def get_replies_uri(parent_status_uri) - json_status = fetch_resource(parent_status_uri, true, @current_account) - replies_uri = json_status['replies'] - Rails.logger.debug { "Could not find replies uri for status URI: #{parent_status_uri}" } if replies_uri.nil? - replies_uri + begin + json_status = fetch_resource(parent_status_uri, true, @current_account) + replies_uri = json_status['replies'] + Rails.logger.debug { "FetchAllRepliesWorker - #{@parent_status_id}: replies URI was nil" } if replies_uri.nil? + replies_uri + rescue => e + Rails.logger.error { "FetchAllRepliesWorker - #{@parent_status_id}: Got exception while resolving replies URI: #{e} - #{e.message}" } + nil + end end end From 1f7142efdb8c01b98f2e41c1b1dea5a75f65e88a Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Sat, 12 Oct 2024 21:26:12 -0700 Subject: [PATCH 13/27] Fix limit in fetch_replies_service to not always limit by 5 (which always caused us to only do one page). Rename some variables to make purpose clearer. Return the array of all fetched uris instead of just the number we got Signed-off-by: sneakers-the-rat --- .../activitypub/fetch_all_replies_service.rb | 4 +++- .../activitypub/fetch_replies_service.rb | 8 ++++--- .../activitypub/fetch_all_replies_worker.rb | 22 +++++++++---------- 3 files changed, 19 insertions(+), 15 deletions(-) diff --git a/app/services/activitypub/fetch_all_replies_service.rb b/app/services/activitypub/fetch_all_replies_service.rb index 19fae824df..cba476ab46 100644 --- a/app/services/activitypub/fetch_all_replies_service.rb +++ b/app/services/activitypub/fetch_all_replies_service.rb @@ -10,7 +10,7 @@ class ActivityPub::FetchAllRepliesService < ActivityPub::FetchRepliesService @allow_synchronous_requests = allow_synchronous_requests @filter_by_host = false - @items = collection_items(collection_or_uri) + @items = collection_items(collection_or_uri, fetch_all: true) @items = filtered_replies return if @items.nil? @@ -25,6 +25,8 @@ class ActivityPub::FetchAllRepliesService < ActivityPub::FetchRepliesService return if @items.nil? # find all statuses that we *shouldn't* update the replies for, and use that as a filter + # Typically we assume this is smaller than the replies we *should* fetch, + # so we minimize the number of uris we should load here. uris = @items.map { |item| value_or_id(item) } dont_update = Status.where(uri: uris).shouldnt_fetch_replies.pluck(:uri) diff --git a/app/services/activitypub/fetch_replies_service.rb b/app/services/activitypub/fetch_replies_service.rb index 0dcdb638e1..2963f616e2 100644 --- a/app/services/activitypub/fetch_replies_service.rb +++ b/app/services/activitypub/fetch_replies_service.rb @@ -5,6 +5,8 @@ class ActivityPub::FetchRepliesService < BaseService # Limit of fetched replies MAX_REPLIES = 5 + # Limit when fetching all (to prevent infinite fetch attack) + FETCH_ALL_MAX_REPLIES = 500 def call(parent_status, collection_or_uri, allow_synchronous_requests: true, request_id: nil, filter_by_host: true) @account = parent_status.account @@ -21,7 +23,7 @@ class ActivityPub::FetchRepliesService < BaseService private - def collection_items(collection_or_uri) + def collection_items(collection_or_uri, fetch_all: false) collection = fetch_collection(collection_or_uri) return unless collection.is_a?(Hash) @@ -39,8 +41,8 @@ class ActivityPub::FetchRepliesService < BaseService all_items.concat(as_array(items)) - # Quit early if we are not fetching all replies - break if all_items.size >= MAX_REPLIES + # Quit early if we are not fetching all replies or we've reached the absolute max + break if (!fetch_all && all_items.size >= MAX_REPLIES) || (all_items.size >= FETCH_ALL_MAX_REPLIES) collection = collection['next'].present? ? fetch_collection(collection['next']) : nil end diff --git a/app/workers/activitypub/fetch_all_replies_worker.rb b/app/workers/activitypub/fetch_all_replies_worker.rb index af2cf40e91..222736224b 100644 --- a/app/workers/activitypub/fetch_all_replies_worker.rb +++ b/app/workers/activitypub/fetch_all_replies_worker.rb @@ -11,7 +11,7 @@ class ActivityPub::FetchAllRepliesWorker sidekiq_options queue: 'pull', retry: 3 - # Global max replies to fetch per request + # Global max replies to fetch per request (all replies, recursively) MAX_REPLIES = 1000 def perform(parent_status_id, current_account_id = nil, options = {}) @@ -20,21 +20,21 @@ class ActivityPub::FetchAllRepliesWorker @current_account = @current_account_id.nil? ? nil : Account.find(@current_account_id) Rails.logger.debug { "FetchAllRepliesWorker - #{parent_status_id}: Fetching all replies for status: #{@parent_status}" } - all_replies = get_replies(@parent_status.uri, options) - got_replies = all_replies.length - until all_replies.empty? || got_replies >= MAX_REPLIES - next_reply = all_replies.pop + uris_to_fetch = get_replies(@parent_status.uri, options) + fetched_uris = uris_to_fetch + until uris_to_fetch.empty? || fetched_uris.length >= MAX_REPLIES + next_reply = uris_to_fetch.pop next if next_reply.nil? - new_replies = get_replies(next_reply, options) - next if new_replies.nil? + new_reply_uris = get_replies(next_reply, options) + next if new_reply_uris.nil? - got_replies += new_replies.length - all_replies.concat(new_replies) + uris_to_fetch.concat(new_reply_uris) + fetched_uris.concat(new_reply_uris) end - Rails.logger.debug { "FetchAllRepliesWorker - #{parent_status_id}: fetched #{got_replies} replies" } - got_replies + Rails.logger.debug { "FetchAllRepliesWorker - #{parent_status_id}: fetched #{fetched_uris.length} replies" } + fetched_uris end private From 7cf6177666a9ade3c9ffb3c942859cce6afa459d Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Sun, 13 Oct 2024 00:58:45 -0700 Subject: [PATCH 14/27] the most basic test you could imagine Signed-off-by: sneakers-the-rat --- .../activitypub/fetch_all_replies_service.rb | 12 ++- .../fetch_all_replies_service_spec.rb | 73 +++++++++++++++++++ 2 files changed, 82 insertions(+), 3 deletions(-) create mode 100644 spec/services/activitypub/fetch_all_replies_service_spec.rb diff --git a/app/services/activitypub/fetch_all_replies_service.rb b/app/services/activitypub/fetch_all_replies_service.rb index cba476ab46..292d5aad30 100644 --- a/app/services/activitypub/fetch_all_replies_service.rb +++ b/app/services/activitypub/fetch_all_replies_service.rb @@ -24,9 +24,15 @@ class ActivityPub::FetchAllRepliesService < ActivityPub::FetchRepliesService def filtered_replies return if @items.nil? - # find all statuses that we *shouldn't* update the replies for, and use that as a filter - # Typically we assume this is smaller than the replies we *should* fetch, - # so we minimize the number of uris we should load here. + # Find all statuses that we *shouldn't* update the replies for, and use that as a filter. + # We don't assume that we have the statuses before they're created, + # hence the negative filter - + # "keep all these uris except the ones we already have" + # instead of + # "keep all these uris that match some conditions on existing Status objects" + # + # Typically we assume the number of replies we *shouldn't* fetch is smaller than the + # replies we *should* fetch, so we also minimize the number of uris we should load here. uris = @items.map { |item| value_or_id(item) } dont_update = Status.where(uri: uris).shouldnt_fetch_replies.pluck(:uri) diff --git a/spec/services/activitypub/fetch_all_replies_service_spec.rb b/spec/services/activitypub/fetch_all_replies_service_spec.rb new file mode 100644 index 0000000000..5ab1536d51 --- /dev/null +++ b/spec/services/activitypub/fetch_all_replies_service_spec.rb @@ -0,0 +1,73 @@ +# frozen_string_literal: true + +require 'rails_helper' + +RSpec.describe ActivityPub::FetchAllRepliesService do + subject { described_class.new } + + let(:actor) { Fabricate(:account, domain: 'example.com', uri: 'http://example.com/account') } + let(:status) { Fabricate(:status, account: actor) } + let(:collection_uri) { 'http://example.com/replies/1' } + + let(:items) do + [ + 'http://example.com/self-reply-1', + 'http://example.com/self-reply-2', + 'http://example.com/self-reply-3', + 'http://other.com/other-reply-1', + 'http://other.com/other-reply-2', + 'http://other.com/other-reply-3', + 'http://example.com/self-reply-4', + 'http://example.com/self-reply-5', + 'http://example.com/self-reply-6', + ] + end + + let(:payload) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + type: 'Collection', + id: collection_uri, + items: items, + }.with_indifferent_access + end + + describe '#call' do + it 'fetches more than the default maximum and from multiple domains' do + allow(FetchReplyWorker).to receive(:push_bulk) + + subject.call(payload) + + expect(FetchReplyWorker).to have_received(:push_bulk).with(%w(http://example.com/self-reply-1 http://example.com/self-reply-2 http://example.com/self-reply-3 http://other.com/other-reply-1 http://other.com/other-reply-2 http://other.com/other-reply-3 http://example.com/self-reply-4 + http://example.com/self-reply-5 http://example.com/self-reply-6)) + end + + context 'with a recent status' do + before do + Fabricate(:status, uri: 'http://example.com/self-reply-2', fetched_replies_at: 1.second.ago, local: false) + end + + it 'skips statuses that have been updated recently' do + allow(FetchReplyWorker).to receive(:push_bulk) + + subject.call(payload) + + expect(FetchReplyWorker).to have_received(:push_bulk).with(%w(http://example.com/self-reply-1 http://example.com/self-reply-3 http://other.com/other-reply-1 http://other.com/other-reply-2 http://other.com/other-reply-3 http://example.com/self-reply-4 http://example.com/self-reply-5 http://example.com/self-reply-6)) + end + end + + context 'with an old status' do + before do + Fabricate(:status, uri: 'http://other.com/other-reply-1', fetched_replies_at: 1.year.ago, created_at: 1.year.ago, account: actor) + end + + it 'updates the time that fetched statuses were last fetched' do + allow(FetchReplyWorker).to receive(:push_bulk) + + subject.call(payload) + + expect(Status.find_by(uri: 'http://other.com/other-reply-1').fetched_replies_at).to be >= 1.minute.ago + end + end + end +end From 2e21402330d8c8c974b4847765dfb3c9ee6d10b6 Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Sun, 13 Oct 2024 18:10:45 -0700 Subject: [PATCH 15/27] tests for the fetch all reply worker Signed-off-by: sneakers-the-rat --- .../activitypub/fetch_all_replies_worker.rb | 16 +- .../fetch_all_replies_worker_spec.rb | 254 ++++++++++++++++++ 2 files changed, 263 insertions(+), 7 deletions(-) create mode 100644 spec/workers/activitypub/fetch_all_replies_worker_spec.rb diff --git a/app/workers/activitypub/fetch_all_replies_worker.rb b/app/workers/activitypub/fetch_all_replies_worker.rb index 222736224b..53e132f61e 100644 --- a/app/workers/activitypub/fetch_all_replies_worker.rb +++ b/app/workers/activitypub/fetch_all_replies_worker.rb @@ -21,7 +21,9 @@ class ActivityPub::FetchAllRepliesWorker Rails.logger.debug { "FetchAllRepliesWorker - #{parent_status_id}: Fetching all replies for status: #{@parent_status}" } uris_to_fetch = get_replies(@parent_status.uri, options) - fetched_uris = uris_to_fetch + fetched_uris = uris_to_fetch.clone + return if uris_to_fetch.nil? + until uris_to_fetch.empty? || fetched_uris.length >= MAX_REPLIES next_reply = uris_to_fetch.pop next if next_reply.nil? @@ -40,18 +42,18 @@ class ActivityPub::FetchAllRepliesWorker private def get_replies(status_uri, options = {}) - replies_uri = get_replies_uri(status_uri) - return if replies_uri.nil? + replies_collection_or_uri = get_replies_uri(status_uri) + return if replies_collection_or_uri.nil? - ActivityPub::FetchAllRepliesService.new.call(replies_uri, **options.deep_symbolize_keys) + ActivityPub::FetchAllRepliesService.new.call(replies_collection_or_uri, **options.deep_symbolize_keys) end def get_replies_uri(parent_status_uri) begin json_status = fetch_resource(parent_status_uri, true, @current_account) - replies_uri = json_status['replies'] - Rails.logger.debug { "FetchAllRepliesWorker - #{@parent_status_id}: replies URI was nil" } if replies_uri.nil? - replies_uri + replies_collection_or_uri = json_status['replies'] + Rails.logger.debug { "FetchAllRepliesWorker - #{@parent_status_id}: replies URI was nil" } if replies_collection_or_uri.nil? + replies_collection_or_uri rescue => e Rails.logger.error { "FetchAllRepliesWorker - #{@parent_status_id}: Got exception while resolving replies URI: #{e} - #{e.message}" } nil diff --git a/spec/workers/activitypub/fetch_all_replies_worker_spec.rb b/spec/workers/activitypub/fetch_all_replies_worker_spec.rb new file mode 100644 index 0000000000..35fadb6caf --- /dev/null +++ b/spec/workers/activitypub/fetch_all_replies_worker_spec.rb @@ -0,0 +1,254 @@ +# frozen_string_literal: true + +require 'rails_helper' + +RSpec.describe ActivityPub::FetchAllRepliesWorker do + subject { described_class.new } + + let(:top_items) do + [ + 'http://example.com/self-reply-1', + 'http://other.com/other-reply-2', + 'http://example.com/self-reply-3', + ] + end + + let(:top_items_paged) do + [ + 'http://example.com/self-reply-4', + 'http://other.com/other-reply-5', + 'http://example.com/self-reply-6', + ] + end + + let(:nested_items) do + [ + 'http://example.com/nested-self-reply-1', + 'http://other.com/nested-other-reply-2', + 'http://example.com/nested-self-reply-3', + ] + end + + let(:nested_items_paged) do + [ + 'http://example.com/nested-self-reply-4', + 'http://other.com/nested-other-reply-5', + 'http://example.com/nested-self-reply-6', + ] + end + + let(:all_items) do + top_items + top_items_paged + nested_items + nested_items_paged + end + + let(:top_note_uri) do + 'http://example.com/top-post' + end + + let(:top_collection_uri) do + 'http://example.com/top-post/replies' + end + + # The reply uri that has the nested replies under it + let(:reply_note_uri) do + 'http://other.com/other-reply-2' + end + + # The collection uri of nested replies + let(:reply_collection_uri) do + 'http://other.com/other-reply-2/replies' + end + + let(:replies_top) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: top_collection_uri, + type: 'Collection', + items: top_items + top_items_paged, + } + end + + let(:replies_nested) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: reply_collection_uri, + type: 'Collection', + items: nested_items + nested_items_paged, + } + end + + # The status resource for the top post + let(:top_object) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: top_note_uri, + type: 'Note', + content: 'Lorem ipsum', + replies: replies_top, + attributedTo: 'https://example.com', + } + end + + # The status resource that has the uri to the replies collection + let(:reply_object) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: reply_note_uri, + type: 'Note', + content: 'Lorem ipsum', + replies: replies_nested, + attributedTo: 'https://other.com', + } + end + + let(:empty_object) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: 'https://example.com/empty', + type: 'Note', + content: 'Lorem ipsum', + replies: [], + attributedTo: 'https://example.com', + } + end + + let(:account) { Fabricate(:account, domain: 'example.com') } + let(:status) { Fabricate(:status, account: account, uri: top_note_uri) } + + before do + allow(FetchReplyWorker).to receive(:push_bulk) + all_items.each do |item| + next if [top_note_uri, reply_note_uri].include? item + + stub_request(:get, item).to_return(status: 200, body: Oj.dump(empty_object), headers: { 'Content-Type': 'application/activity+json' }) + end + end + + shared_examples 'fetches all replies' do + before do + stub_request(:get, top_note_uri).to_return(status: 200, body: Oj.dump(top_object), headers: { 'Content-Type': 'application/activity+json' }) + stub_request(:get, reply_note_uri).to_return(status: 200, body: Oj.dump(reply_object), headers: { 'Content-Type': 'application/activity+json' }) + end + + it 'fetches statuses recursively' do + got_uris = subject.perform(status.id) + expect(got_uris).to match_array(all_items) + end + + it 'respects the maxium limits set by not recursing after the max is reached' do + stub_const('ActivityPub::FetchAllRepliesWorker::MAX_REPLIES', 5) + got_uris = subject.perform(status.id) + expect(got_uris).to match_array(top_items + top_items_paged) + end + end + + describe 'perform' do + context 'when the payload is a Note with replies as a Collection of inlined replies' do + it_behaves_like 'fetches all replies' + end + + context 'when the payload is a Note with replies as a URI to a Collection' do + let(:top_object) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: top_note_uri, + type: 'Note', + content: 'Lorem ipsum', + replies: top_collection_uri, + attributedTo: 'https://example.com', + } + end + let(:reply_object) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: reply_note_uri, + type: 'Note', + content: 'Lorem ipsum', + replies: reply_collection_uri, + attributedTo: 'https://other.com', + } + end + + before do + stub_request(:get, top_collection_uri).to_return(status: 200, body: Oj.dump(replies_top), headers: { 'Content-Type': 'application/activity+json' }) + stub_request(:get, reply_collection_uri).to_return(status: 200, body: Oj.dump(replies_nested), headers: { 'Content-Type': 'application/activity+json' }) + end + + it_behaves_like 'fetches all replies' + end + + context 'when the payload is a Note with replies as a paginated collection' do + let(:top_page_2_uri) do + "#{top_collection_uri}/2" + end + + let(:reply_page_2_uri) do + "#{reply_collection_uri}/2" + end + + let(:top_object) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: top_note_uri, + type: 'Note', + content: 'Lorem ipsum', + replies: { + type: 'Collection', + id: top_collection_uri, + first: { + type: 'CollectionPage', + partOf: top_collection_uri, + items: top_items, + next: top_page_2_uri, + }, + }, + attributedTo: 'https://example.com', + } + end + let(:reply_object) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: reply_note_uri, + type: 'Note', + content: 'Lorem ipsum', + replies: { + type: 'Collection', + id: reply_collection_uri, + first: { + type: 'CollectionPage', + partOf: reply_collection_uri, + items: nested_items, + next: reply_page_2_uri, + }, + }, + attributedTo: 'https://other.com', + } + end + + let(:top_page_two) do + { + type: 'CollectionPage', + id: top_page_2_uri, + partOf: top_collection_uri, + items: top_items_paged, + } + end + + let(:reply_page_two) do + { + type: 'CollectionPage', + id: reply_page_2_uri, + partOf: reply_collection_uri, + items: nested_items_paged, + } + end + + before do + stub_request(:get, top_page_2_uri).to_return(status: 200, body: Oj.dump(top_page_two), headers: { 'Content-Type': 'application/activity+json' }) + stub_request(:get, reply_page_2_uri).to_return(status: 200, body: Oj.dump(reply_page_two), headers: { 'Content-Type': 'application/activity+json' }) + end + + it_behaves_like 'fetches all replies' + end + end +end From 8e35490276ad73f7e91195ac62f7b4536a55af44 Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Mon, 14 Oct 2024 22:17:14 -0700 Subject: [PATCH 16/27] remove redundant params - forgot i subclassed Signed-off-by: sneakers-the-rat --- app/services/activitypub/fetch_all_replies_service.rb | 2 +- app/services/activitypub/fetch_replies_service.rb | 7 ++----- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/app/services/activitypub/fetch_all_replies_service.rb b/app/services/activitypub/fetch_all_replies_service.rb index 292d5aad30..82b7943907 100644 --- a/app/services/activitypub/fetch_all_replies_service.rb +++ b/app/services/activitypub/fetch_all_replies_service.rb @@ -10,7 +10,7 @@ class ActivityPub::FetchAllRepliesService < ActivityPub::FetchRepliesService @allow_synchronous_requests = allow_synchronous_requests @filter_by_host = false - @items = collection_items(collection_or_uri, fetch_all: true) + @items = collection_items(collection_or_uri) @items = filtered_replies return if @items.nil? diff --git a/app/services/activitypub/fetch_replies_service.rb b/app/services/activitypub/fetch_replies_service.rb index 2963f616e2..bb20e66649 100644 --- a/app/services/activitypub/fetch_replies_service.rb +++ b/app/services/activitypub/fetch_replies_service.rb @@ -5,8 +5,6 @@ class ActivityPub::FetchRepliesService < BaseService # Limit of fetched replies MAX_REPLIES = 5 - # Limit when fetching all (to prevent infinite fetch attack) - FETCH_ALL_MAX_REPLIES = 500 def call(parent_status, collection_or_uri, allow_synchronous_requests: true, request_id: nil, filter_by_host: true) @account = parent_status.account @@ -23,7 +21,7 @@ class ActivityPub::FetchRepliesService < BaseService private - def collection_items(collection_or_uri, fetch_all: false) + def collection_items(collection_or_uri) collection = fetch_collection(collection_or_uri) return unless collection.is_a?(Hash) @@ -41,8 +39,7 @@ class ActivityPub::FetchRepliesService < BaseService all_items.concat(as_array(items)) - # Quit early if we are not fetching all replies or we've reached the absolute max - break if (!fetch_all && all_items.size >= MAX_REPLIES) || (all_items.size >= FETCH_ALL_MAX_REPLIES) + break if all_items.size > MAX_REPLIES collection = collection['next'].present? ? fetch_collection(collection['next']) : nil end From 70968032e359b71eca1ec8665c4cd86be736359c Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Mon, 21 Oct 2024 12:12:24 -0700 Subject: [PATCH 17/27] apply changes from patches that couldnt be auto-applied --- .env.production.sample | 24 +++++++++++++ app/controllers/api/v1/statuses_controller.rb | 1 - .../concerns/status/fetch_replies_concern.rb | 10 +++--- .../activitypub/fetch_all_replies_service.rb | 8 +++-- .../activitypub/fetch_all_replies_worker.rb | 35 ++++++++++++------- 5 files changed, 59 insertions(+), 19 deletions(-) diff --git a/.env.production.sample b/.env.production.sample index 3dd66abae4..324bbc5bf0 100644 --- a/.env.production.sample +++ b/.env.production.sample @@ -86,3 +86,27 @@ S3_ALIAS_HOST=files.example.com # ----------------------- IP_RETENTION_PERIOD=31556952 SESSION_RETENTION_PERIOD=31556952 + +# Fetch All Replies Behavior +# -------------------------- +# When a user expands a post (DetailedStatus view), fetch all of its replies +# (default: true if unset, set explicitly to ``false`` to disable) +FETCH_REPLIES_ENABLED=true + +# Period to wait between fetching replies (in minutes) +FETCH_REPLIES_DEBOUNCE=15 + +# Period to wait after a post is first created before fetching its replies (in minutes) +FETCH_REPLIES_CREATED_RECENTLY=5 + +# Max number of replies to fetch - total, recursively through a whole reply tree +FETCH_REPLIES_MAX_GLOBAL=1000 + +# Max number of replies to fetch - for a single post +FETCH_REPLIES_MAX_SINGLE=500 + +# Max number of replies to fetch - total, recursively through a whole reply tree +FETCH_REPLIES_MAX_GLOBAL=1000 + +# Max number of replies to fetch - for a single post +FETCH_REPLIES_MAX_SINGLE=500 diff --git a/app/controllers/api/v1/statuses_controller.rb b/app/controllers/api/v1/statuses_controller.rb index f131eaed09..87298a39ed 100644 --- a/app/controllers/api/v1/statuses_controller.rb +++ b/app/controllers/api/v1/statuses_controller.rb @@ -62,7 +62,6 @@ class Api::V1::StatusesController < Api::BaseController if !current_account.nil? && @status.should_fetch_replies? ActivityPub::FetchAllRepliesWorker.perform_async( @status.id, - current_account.id, { allow_synchronous_requests: true, } diff --git a/app/models/concerns/status/fetch_replies_concern.rb b/app/models/concerns/status/fetch_replies_concern.rb index 696e93ec88..62c908840a 100644 --- a/app/models/concerns/status/fetch_replies_concern.rb +++ b/app/models/concerns/status/fetch_replies_concern.rb @@ -3,10 +3,12 @@ module Status::FetchRepliesConcern extend ActiveSupport::Concern - # debounce fetching all replies to minimize DoS - FETCH_REPLIES_DEBOUNCE = 30.minutes + # enable/disable fetching all replies + FETCH_REPLIES_ENABLED = ENV.key?('FETCH_REPLIES_ENABLED') ? ENV['FETCH_REPLIES_ENABLED'] == 'true' : true - CREATED_RECENTLY_DEBOUNCE = 10.minutes + # debounce fetching all replies to minimize DoS + FETCH_REPLIES_DEBOUNCE = (ENV['FETCH_REPLIES_DEBOUNCE'] || 15).to_i.minutes + CREATED_RECENTLY_DEBOUNCE = (ENV['FETCH_REPLIES_CREATED_RECENTLY'] || 5).to_i.minutes included do scope :created_recently, -> { where(created_at: CREATED_RECENTLY_DEBOUNCE.ago..) } @@ -20,7 +22,7 @@ module Status::FetchRepliesConcern def should_fetch_replies? # we aren't brand new, and we haven't fetched replies since the debounce window - !local? && created_at <= CREATED_RECENTLY_DEBOUNCE.ago && ( + FETCH_REPLIES_ENABLED && !local? && created_at <= CREATED_RECENTLY_DEBOUNCE.ago && ( fetched_replies_at.nil? || fetched_replies_at <= FETCH_REPLIES_DEBOUNCE.ago ) end diff --git a/app/services/activitypub/fetch_all_replies_service.rb b/app/services/activitypub/fetch_all_replies_service.rb index 82b7943907..50be4c9e8b 100644 --- a/app/services/activitypub/fetch_all_replies_service.rb +++ b/app/services/activitypub/fetch_all_replies_service.rb @@ -4,11 +4,12 @@ class ActivityPub::FetchAllRepliesService < ActivityPub::FetchRepliesService include JsonLdHelper # Limit of replies to fetch per status - MAX_REPLIES = 500 + MAX_REPLIES = (ENV['FETCH_REPLIES_MAX_SINGLE'] || 500).to_i def call(collection_or_uri, allow_synchronous_requests: true, request_id: nil) @allow_synchronous_requests = allow_synchronous_requests @filter_by_host = false + @collection_or_uri = collection_or_uri @items = collection_items(collection_or_uri) @items = filtered_replies @@ -40,6 +41,9 @@ class ActivityPub::FetchAllRepliesService < ActivityPub::FetchRepliesService Status.where(uri: uris).should_fetch_replies.touch_all(:fetched_replies_at) # Reject all statuses that we already have in the db - uris.reject { |uri| dont_update.include?(uri) }.take(MAX_REPLIES) + uris = uris.reject { |uri| dont_update.include?(uri) }.take(MAX_REPLIES) + + Rails.logger.debug { "FetchAllRepliesService - #{@collection_or_uri}: Fetching filtered statuses: #{uris}" } + uris end end diff --git a/app/workers/activitypub/fetch_all_replies_worker.rb b/app/workers/activitypub/fetch_all_replies_worker.rb index 53e132f61e..c8d0546891 100644 --- a/app/workers/activitypub/fetch_all_replies_worker.rb +++ b/app/workers/activitypub/fetch_all_replies_worker.rb @@ -12,18 +12,18 @@ class ActivityPub::FetchAllRepliesWorker sidekiq_options queue: 'pull', retry: 3 # Global max replies to fetch per request (all replies, recursively) - MAX_REPLIES = 1000 + MAX_REPLIES = (ENV['FETCH_REPLIES_MAX_GLOBAL'] || 1000).to_i - def perform(parent_status_id, current_account_id = nil, options = {}) + def perform(parent_status_id, options = {}) @parent_status = Status.find(parent_status_id) - @current_account_id = current_account_id - @current_account = @current_account_id.nil? ? nil : Account.find(@current_account_id) - Rails.logger.debug { "FetchAllRepliesWorker - #{parent_status_id}: Fetching all replies for status: #{@parent_status}" } + Rails.logger.debug { "FetchAllRepliesWorker - #{@parent_status.uri}: Fetching all replies for status: #{@parent_status}" } uris_to_fetch = get_replies(@parent_status.uri, options) - fetched_uris = uris_to_fetch.clone return if uris_to_fetch.nil? + @parent_status.touch(:fetched_replies_at) + fetched_uris = uris_to_fetch.clone.to_set + until uris_to_fetch.empty? || fetched_uris.length >= MAX_REPLIES next_reply = uris_to_fetch.pop next if next_reply.nil? @@ -31,8 +31,10 @@ class ActivityPub::FetchAllRepliesWorker new_reply_uris = get_replies(next_reply, options) next if new_reply_uris.nil? + new_reply_uris = new_reply_uris.reject { |uri| fetched_uris.include?(uri) } + uris_to_fetch.concat(new_reply_uris) - fetched_uris.concat(new_reply_uris) + fetched_uris = fetched_uris.merge(new_reply_uris) end Rails.logger.debug { "FetchAllRepliesWorker - #{parent_status_id}: fetched #{fetched_uris.length} replies" } @@ -50,12 +52,21 @@ class ActivityPub::FetchAllRepliesWorker def get_replies_uri(parent_status_uri) begin - json_status = fetch_resource(parent_status_uri, true, @current_account) - replies_collection_or_uri = json_status['replies'] - Rails.logger.debug { "FetchAllRepliesWorker - #{@parent_status_id}: replies URI was nil" } if replies_collection_or_uri.nil? - replies_collection_or_uri + json_status = fetch_resource(parent_status_uri, true) + if json_status.nil? + Rails.logger.debug { "FetchAllRepliesWorker - #{@parent_status.uri}: Could not get replies URI for #{parent_status_uri}, returned nil" } + nil + elsif !json_status.key?('replies') + Rails.logger.debug { "FetchAllRepliesWorker - #{@parent_status.uri}: No replies collection found in ActivityPub object: #{json_status}" } + nil + else + json_status['replies'] + end rescue => e - Rails.logger.error { "FetchAllRepliesWorker - #{@parent_status_id}: Got exception while resolving replies URI: #{e} - #{e.message}" } + Rails.logger.error { "FetchAllRepliesWorker - #{@parent_status.uri}: Caught exception while resolving replies URI #{parent_status_uri}: #{e} - #{e.message}" } + # Raise if we can't get the collection for top-level status to trigger retry + raise e if parent_status_uri == @parent_status.uri + nil end end From c0fbb9292bf0c71d0465e51fc8a984c7407e0eae Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Mon, 21 Oct 2024 12:15:41 -0700 Subject: [PATCH 18/27] rm noisy schema.rb auto-changes --- db/schema.rb | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/db/schema.rb b/db/schema.rb index c1fbc3431b..224d052200 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -556,12 +556,12 @@ ActiveRecord::Schema[7.2].define(version: 2024_12_05_163118) do end create_table "ip_blocks", force: :cascade do |t| - t.datetime "created_at", precision: nil, null: false - t.datetime "updated_at", precision: nil, null: false - t.datetime "expires_at", precision: nil t.inet "ip", default: "0.0.0.0", null: false t.integer "severity", default: 0, null: false + t.datetime "expires_at", precision: nil t.text "comment", default: "", null: false + t.datetime "created_at", precision: nil, null: false + t.datetime "updated_at", precision: nil, null: false t.index ["ip"], name: "index_ip_blocks_on_ip", unique: true end @@ -1398,9 +1398,9 @@ ActiveRecord::Schema[7.2].define(version: 2024_12_05_163118) do add_index "instances", ["domain"], name: "index_instances_on_domain", unique: true create_view "user_ips", sql_definition: <<-SQL - SELECT t0.user_id, - t0.ip, - max(t0.used_at) AS used_at + SELECT user_id, + ip, + max(used_at) AS used_at FROM ( SELECT users.id AS user_id, users.sign_up_ip AS ip, users.created_at AS used_at @@ -1417,7 +1417,7 @@ ActiveRecord::Schema[7.2].define(version: 2024_12_05_163118) do login_activities.created_at FROM login_activities WHERE (login_activities.success = true)) t0 - GROUP BY t0.user_id, t0.ip; + GROUP BY user_id, ip; SQL create_view "account_summaries", materialized: true, sql_definition: <<-SQL SELECT accounts.id AS account_id, @@ -1438,9 +1438,9 @@ ActiveRecord::Schema[7.2].define(version: 2024_12_05_163118) do add_index "account_summaries", ["account_id"], name: "index_account_summaries_on_account_id", unique: true create_view "global_follow_recommendations", materialized: true, sql_definition: <<-SQL - SELECT t0.account_id, - sum(t0.rank) AS rank, - array_agg(t0.reason) AS reason + SELECT account_id, + sum(rank) AS rank, + array_agg(reason) AS reason FROM ( SELECT account_summaries.account_id, ((count(follows.id))::numeric / (1.0 + (count(follows.id))::numeric)) AS rank, 'most_followed'::text AS reason @@ -1464,8 +1464,8 @@ ActiveRecord::Schema[7.2].define(version: 2024_12_05_163118) do WHERE (follow_recommendation_suppressions.account_id = statuses.account_id))))) GROUP BY account_summaries.account_id HAVING (sum((status_stats.reblogs_count + status_stats.favourites_count)) >= (5)::numeric)) t0 - GROUP BY t0.account_id - ORDER BY (sum(t0.rank)) DESC; + GROUP BY account_id + ORDER BY (sum(rank)) DESC; SQL add_index "global_follow_recommendations", ["account_id"], name: "index_global_follow_recommendations_on_account_id", unique: true From 7aab38fbc0d00471ffd1e8b2312e87feddf42269 Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Mon, 21 Oct 2024 14:17:36 -0700 Subject: [PATCH 19/27] rm duplicated env vars --- .env.production.sample | 6 ------ 1 file changed, 6 deletions(-) diff --git a/.env.production.sample b/.env.production.sample index 324bbc5bf0..c934a76e46 100644 --- a/.env.production.sample +++ b/.env.production.sample @@ -104,9 +104,3 @@ FETCH_REPLIES_MAX_GLOBAL=1000 # Max number of replies to fetch - for a single post FETCH_REPLIES_MAX_SINGLE=500 - -# Max number of replies to fetch - total, recursively through a whole reply tree -FETCH_REPLIES_MAX_GLOBAL=1000 - -# Max number of replies to fetch - for a single post -FETCH_REPLIES_MAX_SINGLE=500 From 5dbfebf756e974aa450663ed5ec74fb73e089a8e Mon Sep 17 00:00:00 2001 From: Jonny Saunders Date: Mon, 28 Oct 2024 09:36:04 -0700 Subject: [PATCH 20/27] Update app/services/activitypub/fetch_all_replies_service.rb Co-authored-by: Claire --- app/services/activitypub/fetch_all_replies_service.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/services/activitypub/fetch_all_replies_service.rb b/app/services/activitypub/fetch_all_replies_service.rb index 50be4c9e8b..1c17ba4732 100644 --- a/app/services/activitypub/fetch_all_replies_service.rb +++ b/app/services/activitypub/fetch_all_replies_service.rb @@ -41,7 +41,7 @@ class ActivityPub::FetchAllRepliesService < ActivityPub::FetchRepliesService Status.where(uri: uris).should_fetch_replies.touch_all(:fetched_replies_at) # Reject all statuses that we already have in the db - uris = uris.reject { |uri| dont_update.include?(uri) }.take(MAX_REPLIES) + uris = (uris - dont_update).take(MAX_REPLIES) Rails.logger.debug { "FetchAllRepliesService - #{@collection_or_uri}: Fetching filtered statuses: #{uris}" } uris From 19b67e66bb34b3a71c2e59300c47e479e5a5c202 Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Fri, 15 Nov 2024 02:26:17 -0800 Subject: [PATCH 21/27] add maximum page limit --- .env.production.sample | 3 +++ .../activitypub/fetch_all_replies_service.rb | 6 +++--- .../activitypub/fetch_replies_service.rb | 11 +++++++---- .../activitypub/fetch_all_replies_worker.rb | 12 +++++++----- .../activitypub/fetch_all_replies_worker_spec.rb | 16 ++++++++++------ 5 files changed, 30 insertions(+), 18 deletions(-) diff --git a/.env.production.sample b/.env.production.sample index c934a76e46..277f31b73b 100644 --- a/.env.production.sample +++ b/.env.production.sample @@ -104,3 +104,6 @@ FETCH_REPLIES_MAX_GLOBAL=1000 # Max number of replies to fetch - for a single post FETCH_REPLIES_MAX_SINGLE=500 + +# Max number of replies Collection pages to fetch - total +FETCH_REPLIES_MAX_PAGES=500 diff --git a/app/services/activitypub/fetch_all_replies_service.rb b/app/services/activitypub/fetch_all_replies_service.rb index 1c17ba4732..8771096feb 100644 --- a/app/services/activitypub/fetch_all_replies_service.rb +++ b/app/services/activitypub/fetch_all_replies_service.rb @@ -6,18 +6,18 @@ class ActivityPub::FetchAllRepliesService < ActivityPub::FetchRepliesService # Limit of replies to fetch per status MAX_REPLIES = (ENV['FETCH_REPLIES_MAX_SINGLE'] || 500).to_i - def call(collection_or_uri, allow_synchronous_requests: true, request_id: nil) + def call(collection_or_uri, max_pages = nil, allow_synchronous_requests: true, request_id: nil) @allow_synchronous_requests = allow_synchronous_requests @filter_by_host = false @collection_or_uri = collection_or_uri - @items = collection_items(collection_or_uri) + @items, n_pages = collection_items(collection_or_uri, max_pages) @items = filtered_replies return if @items.nil? FetchReplyWorker.push_bulk(@items) { |reply_uri| [reply_uri, { 'request_id' => request_id }] } - @items + [@items, n_pages] end private diff --git a/app/services/activitypub/fetch_replies_service.rb b/app/services/activitypub/fetch_replies_service.rb index bb20e66649..b26e75f125 100644 --- a/app/services/activitypub/fetch_replies_service.rb +++ b/app/services/activitypub/fetch_replies_service.rb @@ -11,7 +11,7 @@ class ActivityPub::FetchRepliesService < BaseService @allow_synchronous_requests = allow_synchronous_requests @filter_by_host = filter_by_host - @items = collection_items(collection_or_uri) + @items, = collection_items(collection_or_uri) return if @items.nil? FetchReplyWorker.push_bulk(filtered_replies) { |reply_uri| [reply_uri, { 'request_id' => request_id }] } @@ -21,7 +21,7 @@ class ActivityPub::FetchRepliesService < BaseService private - def collection_items(collection_or_uri) + def collection_items(collection_or_uri, max_pages = nil) collection = fetch_collection(collection_or_uri) return unless collection.is_a?(Hash) @@ -29,6 +29,7 @@ class ActivityPub::FetchRepliesService < BaseService return unless collection.is_a?(Hash) all_items = [] + n_pages = 1 while collection.is_a?(Hash) items = case collection['type'] when 'Collection', 'CollectionPage' @@ -39,12 +40,14 @@ class ActivityPub::FetchRepliesService < BaseService all_items.concat(as_array(items)) - break if all_items.size > MAX_REPLIES + break if all_items.size >= MAX_REPLIES + break if !max_pages.nil? && n_pages >= max_pages collection = collection['next'].present? ? fetch_collection(collection['next']) : nil + n_pages += 1 end - all_items + [all_items, n_pages] end def fetch_collection(collection_or_uri) diff --git a/app/workers/activitypub/fetch_all_replies_worker.rb b/app/workers/activitypub/fetch_all_replies_worker.rb index c8d0546891..c5351b77af 100644 --- a/app/workers/activitypub/fetch_all_replies_worker.rb +++ b/app/workers/activitypub/fetch_all_replies_worker.rb @@ -13,28 +13,30 @@ class ActivityPub::FetchAllRepliesWorker # Global max replies to fetch per request (all replies, recursively) MAX_REPLIES = (ENV['FETCH_REPLIES_MAX_GLOBAL'] || 1000).to_i + MAX_PAGES = (ENV['FETCH_REPLIES_MAX_PAGES'] || 500).to_i def perform(parent_status_id, options = {}) @parent_status = Status.find(parent_status_id) Rails.logger.debug { "FetchAllRepliesWorker - #{@parent_status.uri}: Fetching all replies for status: #{@parent_status}" } - uris_to_fetch = get_replies(@parent_status.uri, options) + uris_to_fetch, n_pages = get_replies(@parent_status.uri, MAX_PAGES, options) return if uris_to_fetch.nil? @parent_status.touch(:fetched_replies_at) fetched_uris = uris_to_fetch.clone.to_set - until uris_to_fetch.empty? || fetched_uris.length >= MAX_REPLIES + until uris_to_fetch.empty? || fetched_uris.length >= MAX_REPLIES || n_pages >= MAX_PAGES next_reply = uris_to_fetch.pop next if next_reply.nil? - new_reply_uris = get_replies(next_reply, options) + new_reply_uris, new_n_pages = get_replies(next_reply, MAX_PAGES - n_pages, options) next if new_reply_uris.nil? new_reply_uris = new_reply_uris.reject { |uri| fetched_uris.include?(uri) } uris_to_fetch.concat(new_reply_uris) fetched_uris = fetched_uris.merge(new_reply_uris) + n_pages += new_n_pages end Rails.logger.debug { "FetchAllRepliesWorker - #{parent_status_id}: fetched #{fetched_uris.length} replies" } @@ -43,11 +45,11 @@ class ActivityPub::FetchAllRepliesWorker private - def get_replies(status_uri, options = {}) + def get_replies(status_uri, max_pages, options = {}) replies_collection_or_uri = get_replies_uri(status_uri) return if replies_collection_or_uri.nil? - ActivityPub::FetchAllRepliesService.new.call(replies_collection_or_uri, **options.deep_symbolize_keys) + ActivityPub::FetchAllRepliesService.new.call(replies_collection_or_uri, max_pages, **options.deep_symbolize_keys) end def get_replies_uri(parent_status_uri) diff --git a/spec/workers/activitypub/fetch_all_replies_worker_spec.rb b/spec/workers/activitypub/fetch_all_replies_worker_spec.rb index 35fadb6caf..7346088371 100644 --- a/spec/workers/activitypub/fetch_all_replies_worker_spec.rb +++ b/spec/workers/activitypub/fetch_all_replies_worker_spec.rb @@ -122,20 +122,18 @@ RSpec.describe ActivityPub::FetchAllRepliesWorker do stub_request(:get, item).to_return(status: 200, body: Oj.dump(empty_object), headers: { 'Content-Type': 'application/activity+json' }) end + + stub_request(:get, top_note_uri).to_return(status: 200, body: Oj.dump(top_object), headers: { 'Content-Type': 'application/activity+json' }) + stub_request(:get, reply_note_uri).to_return(status: 200, body: Oj.dump(reply_object), headers: { 'Content-Type': 'application/activity+json' }) end shared_examples 'fetches all replies' do - before do - stub_request(:get, top_note_uri).to_return(status: 200, body: Oj.dump(top_object), headers: { 'Content-Type': 'application/activity+json' }) - stub_request(:get, reply_note_uri).to_return(status: 200, body: Oj.dump(reply_object), headers: { 'Content-Type': 'application/activity+json' }) - end - it 'fetches statuses recursively' do got_uris = subject.perform(status.id) expect(got_uris).to match_array(all_items) end - it 'respects the maxium limits set by not recursing after the max is reached' do + it 'respects the maximum limits set by not recursing after the max is reached' do stub_const('ActivityPub::FetchAllRepliesWorker::MAX_REPLIES', 5) got_uris = subject.perform(status.id) expect(got_uris).to match_array(top_items + top_items_paged) @@ -249,6 +247,12 @@ RSpec.describe ActivityPub::FetchAllRepliesWorker do end it_behaves_like 'fetches all replies' + + it 'limits by max pages' do + stub_const('ActivityPub::FetchAllRepliesWorker::MAX_PAGES', 3) + got_uris = subject.perform(status.id) + expect(got_uris).to match_array(top_items + top_items_paged + nested_items) + end end end end From e8d35842a08e5d23aa7d91fa3c7897b578cb6407 Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Sun, 24 Nov 2024 17:00:21 -0800 Subject: [PATCH 22/27] Rename should_not_fetch_replies Co-authored-by: Kouhai <66407198+kouhaidev@users.noreply.github.com> --- app/models/concerns/status/fetch_replies_concern.rb | 2 +- app/services/activitypub/fetch_all_replies_service.rb | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/app/models/concerns/status/fetch_replies_concern.rb b/app/models/concerns/status/fetch_replies_concern.rb index 62c908840a..daa4ec3c4f 100644 --- a/app/models/concerns/status/fetch_replies_concern.rb +++ b/app/models/concerns/status/fetch_replies_concern.rb @@ -16,7 +16,7 @@ module Status::FetchRepliesConcern scope :fetched_recently, -> { where(fetched_replies_at: FETCH_REPLIES_DEBOUNCE.ago..) } scope :not_fetched_recently, -> { where(fetched_replies_at: ..FETCH_REPLIES_DEBOUNCE.ago).or(where(fetched_replies_at: nil)) } - scope :shouldnt_fetch_replies, -> { local.merge(created_recently).merge(fetched_recently) } + scope :should_not_fetch_replies, -> { local.merge(created_recently).merge(fetched_recently) } scope :should_fetch_replies, -> { local.invert_where.merge(not_created_recently).merge(not_fetched_recently) } end diff --git a/app/services/activitypub/fetch_all_replies_service.rb b/app/services/activitypub/fetch_all_replies_service.rb index 8771096feb..807b489938 100644 --- a/app/services/activitypub/fetch_all_replies_service.rb +++ b/app/services/activitypub/fetch_all_replies_service.rb @@ -35,7 +35,7 @@ class ActivityPub::FetchAllRepliesService < ActivityPub::FetchRepliesService # Typically we assume the number of replies we *shouldn't* fetch is smaller than the # replies we *should* fetch, so we also minimize the number of uris we should load here. uris = @items.map { |item| value_or_id(item) } - dont_update = Status.where(uri: uris).shouldnt_fetch_replies.pluck(:uri) + dont_update = Status.where(uri: uris).should_not_fetch_replies.pluck(:uri) # touch all statuses that already exist and that we're about to update Status.where(uri: uris).should_fetch_replies.touch_all(:fetched_replies_at) From af0779eb0b9817451db5293071bbcfec5a5a63cc Mon Sep 17 00:00:00 2001 From: Jonny Saunders Date: Sun, 24 Nov 2024 17:11:21 -0800 Subject: [PATCH 23/27] Quit early if shouldn't fetch replies within worker Co-authored-by: kouhaidev <66407198+kouhaidev@users.noreply.github.com> --- app/workers/activitypub/fetch_all_replies_worker.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/app/workers/activitypub/fetch_all_replies_worker.rb b/app/workers/activitypub/fetch_all_replies_worker.rb index c5351b77af..4a899c0cda 100644 --- a/app/workers/activitypub/fetch_all_replies_worker.rb +++ b/app/workers/activitypub/fetch_all_replies_worker.rb @@ -17,12 +17,13 @@ class ActivityPub::FetchAllRepliesWorker def perform(parent_status_id, options = {}) @parent_status = Status.find(parent_status_id) + return unless @parent_status.should_fetch_replies? + @parent_status.touch(:fetched_replies_at) Rails.logger.debug { "FetchAllRepliesWorker - #{@parent_status.uri}: Fetching all replies for status: #{@parent_status}" } uris_to_fetch, n_pages = get_replies(@parent_status.uri, MAX_PAGES, options) return if uris_to_fetch.nil? - @parent_status.touch(:fetched_replies_at) fetched_uris = uris_to_fetch.clone.to_set until uris_to_fetch.empty? || fetched_uris.length >= MAX_REPLIES || n_pages >= MAX_PAGES From d97b1800b2a0f6d335736069f3cc3ef886b7a3b2 Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Sun, 24 Nov 2024 17:12:39 -0800 Subject: [PATCH 24/27] lint --- app/workers/activitypub/fetch_all_replies_worker.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/app/workers/activitypub/fetch_all_replies_worker.rb b/app/workers/activitypub/fetch_all_replies_worker.rb index 4a899c0cda..e5c80b7935 100644 --- a/app/workers/activitypub/fetch_all_replies_worker.rb +++ b/app/workers/activitypub/fetch_all_replies_worker.rb @@ -18,6 +18,7 @@ class ActivityPub::FetchAllRepliesWorker def perform(parent_status_id, options = {}) @parent_status = Status.find(parent_status_id) return unless @parent_status.should_fetch_replies? + @parent_status.touch(:fetched_replies_at) Rails.logger.debug { "FetchAllRepliesWorker - #{@parent_status.uri}: Fetching all replies for status: #{@parent_status}" } From 63ed1ddd154f652f56bb90495765cf78ce07dc89 Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Mon, 2 Dec 2024 19:48:13 -0800 Subject: [PATCH 25/27] add tests for early returning, fix tests to put root status outside of debounce window --- .../fetch_all_replies_worker_spec.rb | 31 ++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/spec/workers/activitypub/fetch_all_replies_worker_spec.rb b/spec/workers/activitypub/fetch_all_replies_worker_spec.rb index 7346088371..481fc017df 100644 --- a/spec/workers/activitypub/fetch_all_replies_worker_spec.rb +++ b/spec/workers/activitypub/fetch_all_replies_worker_spec.rb @@ -113,7 +113,14 @@ RSpec.describe ActivityPub::FetchAllRepliesWorker do end let(:account) { Fabricate(:account, domain: 'example.com') } - let(:status) { Fabricate(:status, account: account, uri: top_note_uri) } + let(:status) do + Fabricate( + :status, + account: account, + uri: top_note_uri, + created_at: 1.day.ago - Status::FetchRepliesConcern::CREATED_RECENTLY_DEBOUNCE + ) + end before do allow(FetchReplyWorker).to receive(:push_bulk) @@ -254,5 +261,27 @@ RSpec.describe ActivityPub::FetchAllRepliesWorker do expect(got_uris).to match_array(top_items + top_items_paged + nested_items) end end + + context 'when replies should not be fetched' do + # ensure that we should not fetch by setting the status to be created in the debounce window + let(:status) do + Fabricate( + :status, + account: account, + uri: top_note_uri, + created_at: DateTime.now + ) + end + + before do + stub_const('Status::FetchRepliesConcern::CREATED_RECENTLY_DEBOUNCE', 1.week) + end + + it 'returns nil without fetching' do + got_uris = subject.perform(status.id) + expect(got_uris).to be_nil + assert_not_requested :get, top_note_uri + end + end end end From a7b0661d560dfb6e9a9f07a9a30d9e91aec237d1 Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Mon, 2 Dec 2024 19:50:43 -0800 Subject: [PATCH 26/27] single line for fabricated status when its not humongous --- .../workers/activitypub/fetch_all_replies_worker_spec.rb | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/spec/workers/activitypub/fetch_all_replies_worker_spec.rb b/spec/workers/activitypub/fetch_all_replies_worker_spec.rb index 481fc017df..9df1ce498a 100644 --- a/spec/workers/activitypub/fetch_all_replies_worker_spec.rb +++ b/spec/workers/activitypub/fetch_all_replies_worker_spec.rb @@ -264,14 +264,7 @@ RSpec.describe ActivityPub::FetchAllRepliesWorker do context 'when replies should not be fetched' do # ensure that we should not fetch by setting the status to be created in the debounce window - let(:status) do - Fabricate( - :status, - account: account, - uri: top_note_uri, - created_at: DateTime.now - ) - end + let(:status) { Fabricate(:status, account: account, uri: top_note_uri, created_at: DateTime.now) } before do stub_const('Status::FetchRepliesConcern::CREATED_RECENTLY_DEBOUNCE', 1.week) From 1e54bf6ff2c5028b07d37d3170695272787184c3 Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Mon, 2 Dec 2024 19:55:34 -0800 Subject: [PATCH 27/27] rm allow_synchronous_requests param, since it always needs to be true --- app/services/activitypub/fetch_all_replies_service.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/services/activitypub/fetch_all_replies_service.rb b/app/services/activitypub/fetch_all_replies_service.rb index 807b489938..34b5408c1c 100644 --- a/app/services/activitypub/fetch_all_replies_service.rb +++ b/app/services/activitypub/fetch_all_replies_service.rb @@ -6,8 +6,8 @@ class ActivityPub::FetchAllRepliesService < ActivityPub::FetchRepliesService # Limit of replies to fetch per status MAX_REPLIES = (ENV['FETCH_REPLIES_MAX_SINGLE'] || 500).to_i - def call(collection_or_uri, max_pages = nil, allow_synchronous_requests: true, request_id: nil) - @allow_synchronous_requests = allow_synchronous_requests + def call(collection_or_uri, max_pages = nil, request_id: nil) + @allow_synchronous_requests = true @filter_by_host = false @collection_or_uri = collection_or_uri