From 868de92720dce16e8d744f4f69d725ab4a776843 Mon Sep 17 00:00:00 2001 From: Claire Date: Fri, 26 Jul 2024 14:49:06 +0200 Subject: [PATCH] Fix threading of private posts received out of order --- app/lib/activitypub/activity/create.rb | 5 +++ app/lib/thread_repair.rb | 44 ++++++++++++++++++++++++++ app/workers/thread_resolve_worker.rb | 5 +++ 3 files changed, 54 insertions(+) create mode 100644 app/lib/thread_repair.rb diff --git a/app/lib/activitypub/activity/create.rb b/app/lib/activitypub/activity/create.rb index 928803cd64..319f1189fe 100644 --- a/app/lib/activitypub/activity/create.rb +++ b/app/lib/activitypub/activity/create.rb @@ -56,6 +56,7 @@ class ActivityPub::Activity::Create < ActivityPub::Activity end resolve_thread(@status) + fixup_thread(@status) resolve_unresolved_mentions(@status) fetch_replies(@status) distribute @@ -305,6 +306,10 @@ class ActivityPub::Activity::Create < ActivityPub::Activity ThreadResolveWorker.perform_async(status.id, in_reply_to_uri, { 'request_id' => @options[:request_id] }) end + def fixup_thread(status) + ThreadRepair.new(status.uri).reattach_orphaned_children!(status) + end + def resolve_unresolved_mentions(status) @unresolved_mentions.uniq.each do |uri| MentionResolveWorker.perform_in(rand(30...600).seconds, status.id, uri, { 'request_id' => @options[:request_id] }) diff --git a/app/lib/thread_repair.rb b/app/lib/thread_repair.rb new file mode 100644 index 0000000000..8a47249628 --- /dev/null +++ b/app/lib/thread_repair.rb @@ -0,0 +1,44 @@ +# frozen_string_literal: true + +class ThreadRepair + include Redisable + + THREAD_FIXUP_WINDOW = 1.hour.to_i + + def initialize(parent_uri) + @parent_uri = parent_uri + end + + def find_parent(child_id) + with_redis do |redis| + redis.sadd(redis_key, child_id) + redis.expire(redis_key, THREAD_FIXUP_WINDOW) + + parent = ActivityPub::TagManager.instance.uri_to_resource(@parent_uri, Status) + redis.srem(redis_key, child_id) if parent.present? + + parent + end + end + + def reattach_orphaned_children!(parent) + with_redis do |redis| + redis.sscan_each(redis_key, count: 1000) do |ids| + statuses = Status.where(id: ids).to_a + + statuses.each { |status| status.update(thread: parent) } + + # Updated statuses need to be distributed to clients/inserted in TLs + DistributionWorker.push_bulk(statuses.filter(&:within_realtime_window?)) do |status| + [status.id, { 'skip_notifications' => true }] + end + + redis.srem(redis_key, ids) + end + end + end + + def redis_key + "thread_repair:#{@parent_uri}" + end +end diff --git a/app/workers/thread_resolve_worker.rb b/app/workers/thread_resolve_worker.rb index d4cefb3fdc..9661c87123 100644 --- a/app/workers/thread_resolve_worker.rb +++ b/app/workers/thread_resolve_worker.rb @@ -13,6 +13,11 @@ class ThreadResolveWorker parent_status = ActivityPub::TagManager.instance.uri_to_resource(parent_url, Status) parent_status ||= FetchRemoteStatusService.new.call(parent_url, **options.deep_symbolize_keys) + # The parent post does not exist or is private, so take note of it so the thread can be + # reconstructed if the parent arrives. + # This will re-check the database to avoid race conditions + parent_status ||= ThreadRepair.new(parent_url).find_parent(child_status_id) + return if parent_status.nil? child_status.thread = parent_status