diff --git a/app/lib/activitypub/activity/create.rb b/app/lib/activitypub/activity/create.rb index 5d700b49613..8c23daec821 100644 --- a/app/lib/activitypub/activity/create.rb +++ b/app/lib/activitypub/activity/create.rb @@ -88,6 +88,7 @@ class ActivityPub::Activity::Create < ActivityPub::Activity end resolve_thread(@status) + fixup_thread(@status) fetch_replies(@status) distribute forward_for_reply @@ -334,6 +335,10 @@ class ActivityPub::Activity::Create < ActivityPub::Activity ThreadResolveWorker.perform_async(status.id, in_reply_to_uri, { 'request_id' => @options[:request_id] }) end + def fixup_thread(status) + ThreadRepair.new(status.uri).reattach_orphaned_children!(status) + end + def fetch_replies(status) collection = @object['replies'] return if collection.nil? diff --git a/app/lib/thread_repair.rb b/app/lib/thread_repair.rb new file mode 100644 index 00000000000..8a47249628f --- /dev/null +++ b/app/lib/thread_repair.rb @@ -0,0 +1,44 @@ +# frozen_string_literal: true + +class ThreadRepair + include Redisable + + THREAD_FIXUP_WINDOW = 1.hour.to_i + + def initialize(parent_uri) + @parent_uri = parent_uri + end + + def find_parent(child_id) + with_redis do |redis| + redis.sadd(redis_key, child_id) + redis.expire(redis_key, THREAD_FIXUP_WINDOW) + + parent = ActivityPub::TagManager.instance.uri_to_resource(@parent_uri, Status) + redis.srem(redis_key, child_id) if parent.present? + + parent + end + end + + def reattach_orphaned_children!(parent) + with_redis do |redis| + redis.sscan_each(redis_key, count: 1000) do |ids| + statuses = Status.where(id: ids).to_a + + statuses.each { |status| status.update(thread: parent) } + + # Updated statuses need to be distributed to clients/inserted in TLs + DistributionWorker.push_bulk(statuses.filter(&:within_realtime_window?)) do |status| + [status.id, { 'skip_notifications' => true }] + end + + redis.srem(redis_key, ids) + end + end + end + + def redis_key + "thread_repair:#{@parent_uri}" + end +end diff --git a/app/workers/thread_resolve_worker.rb b/app/workers/thread_resolve_worker.rb index d4cefb3fdc0..9661c87123a 100644 --- a/app/workers/thread_resolve_worker.rb +++ b/app/workers/thread_resolve_worker.rb @@ -13,6 +13,11 @@ class ThreadResolveWorker parent_status = ActivityPub::TagManager.instance.uri_to_resource(parent_url, Status) parent_status ||= FetchRemoteStatusService.new.call(parent_url, **options.deep_symbolize_keys) + # The parent post does not exist or is private, so take note of it so the thread can be + # reconstructed if the parent arrives. + # This will re-check the database to avoid race conditions + parent_status ||= ThreadRepair.new(parent_url).find_parent(child_status_id) + return if parent_status.nil? child_status.thread = parent_status