mirror of https://github.com/tootsuite/mastodon
Fix threading of private posts received out of order
parent
f0716368e6
commit
868de92720
|
@ -56,6 +56,7 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
|
||||||
end
|
end
|
||||||
|
|
||||||
resolve_thread(@status)
|
resolve_thread(@status)
|
||||||
|
fixup_thread(@status)
|
||||||
resolve_unresolved_mentions(@status)
|
resolve_unresolved_mentions(@status)
|
||||||
fetch_replies(@status)
|
fetch_replies(@status)
|
||||||
distribute
|
distribute
|
||||||
|
@ -305,6 +306,10 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
|
||||||
ThreadResolveWorker.perform_async(status.id, in_reply_to_uri, { 'request_id' => @options[:request_id] })
|
ThreadResolveWorker.perform_async(status.id, in_reply_to_uri, { 'request_id' => @options[:request_id] })
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def fixup_thread(status)
|
||||||
|
ThreadRepair.new(status.uri).reattach_orphaned_children!(status)
|
||||||
|
end
|
||||||
|
|
||||||
def resolve_unresolved_mentions(status)
|
def resolve_unresolved_mentions(status)
|
||||||
@unresolved_mentions.uniq.each do |uri|
|
@unresolved_mentions.uniq.each do |uri|
|
||||||
MentionResolveWorker.perform_in(rand(30...600).seconds, status.id, uri, { 'request_id' => @options[:request_id] })
|
MentionResolveWorker.perform_in(rand(30...600).seconds, status.id, uri, { 'request_id' => @options[:request_id] })
|
||||||
|
|
|
@ -0,0 +1,44 @@
|
||||||
|
# frozen_string_literal: true
|
||||||
|
|
||||||
|
class ThreadRepair
|
||||||
|
include Redisable
|
||||||
|
|
||||||
|
THREAD_FIXUP_WINDOW = 1.hour.to_i
|
||||||
|
|
||||||
|
def initialize(parent_uri)
|
||||||
|
@parent_uri = parent_uri
|
||||||
|
end
|
||||||
|
|
||||||
|
def find_parent(child_id)
|
||||||
|
with_redis do |redis|
|
||||||
|
redis.sadd(redis_key, child_id)
|
||||||
|
redis.expire(redis_key, THREAD_FIXUP_WINDOW)
|
||||||
|
|
||||||
|
parent = ActivityPub::TagManager.instance.uri_to_resource(@parent_uri, Status)
|
||||||
|
redis.srem(redis_key, child_id) if parent.present?
|
||||||
|
|
||||||
|
parent
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def reattach_orphaned_children!(parent)
|
||||||
|
with_redis do |redis|
|
||||||
|
redis.sscan_each(redis_key, count: 1000) do |ids|
|
||||||
|
statuses = Status.where(id: ids).to_a
|
||||||
|
|
||||||
|
statuses.each { |status| status.update(thread: parent) }
|
||||||
|
|
||||||
|
# Updated statuses need to be distributed to clients/inserted in TLs
|
||||||
|
DistributionWorker.push_bulk(statuses.filter(&:within_realtime_window?)) do |status|
|
||||||
|
[status.id, { 'skip_notifications' => true }]
|
||||||
|
end
|
||||||
|
|
||||||
|
redis.srem(redis_key, ids)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def redis_key
|
||||||
|
"thread_repair:#{@parent_uri}"
|
||||||
|
end
|
||||||
|
end
|
|
@ -13,6 +13,11 @@ class ThreadResolveWorker
|
||||||
parent_status = ActivityPub::TagManager.instance.uri_to_resource(parent_url, Status)
|
parent_status = ActivityPub::TagManager.instance.uri_to_resource(parent_url, Status)
|
||||||
parent_status ||= FetchRemoteStatusService.new.call(parent_url, **options.deep_symbolize_keys)
|
parent_status ||= FetchRemoteStatusService.new.call(parent_url, **options.deep_symbolize_keys)
|
||||||
|
|
||||||
|
# The parent post does not exist or is private, so take note of it so the thread can be
|
||||||
|
# reconstructed if the parent arrives.
|
||||||
|
# This will re-check the database to avoid race conditions
|
||||||
|
parent_status ||= ThreadRepair.new(parent_url).find_parent(child_status_id)
|
||||||
|
|
||||||
return if parent_status.nil?
|
return if parent_status.nil?
|
||||||
|
|
||||||
child_status.thread = parent_status
|
child_status.thread = parent_status
|
||||||
|
|
Loading…
Reference in New Issue