mirror of
https://github.com/mastodon/mastodon.git
synced 2024-08-20 21:08:15 -07:00
Fix threading of private posts received out of order
This commit is contained in:
parent
693d9b03ed
commit
52c56ed5a6
3 changed files with 54 additions and 0 deletions
|
@ -88,6 +88,7 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
|
||||||
end
|
end
|
||||||
|
|
||||||
resolve_thread(@status)
|
resolve_thread(@status)
|
||||||
|
fixup_thread(@status)
|
||||||
fetch_replies(@status)
|
fetch_replies(@status)
|
||||||
distribute
|
distribute
|
||||||
forward_for_reply
|
forward_for_reply
|
||||||
|
@ -334,6 +335,10 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
|
||||||
ThreadResolveWorker.perform_async(status.id, in_reply_to_uri, { 'request_id' => @options[:request_id] })
|
ThreadResolveWorker.perform_async(status.id, in_reply_to_uri, { 'request_id' => @options[:request_id] })
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def fixup_thread(status)
|
||||||
|
ThreadRepair.new(status.uri).reattach_orphaned_children!(status)
|
||||||
|
end
|
||||||
|
|
||||||
def fetch_replies(status)
|
def fetch_replies(status)
|
||||||
collection = @object['replies']
|
collection = @object['replies']
|
||||||
return if collection.nil?
|
return if collection.nil?
|
||||||
|
|
44
app/lib/thread_repair.rb
Normal file
44
app/lib/thread_repair.rb
Normal file
|
@ -0,0 +1,44 @@
|
||||||
|
# frozen_string_literal: true
|
||||||
|
|
||||||
|
class ThreadRepair
|
||||||
|
include Redisable
|
||||||
|
|
||||||
|
THREAD_FIXUP_WINDOW = 1.hour.to_i
|
||||||
|
|
||||||
|
def initialize(parent_uri)
|
||||||
|
@parent_uri = parent_uri
|
||||||
|
end
|
||||||
|
|
||||||
|
def find_parent(child_id)
|
||||||
|
with_redis do |redis|
|
||||||
|
redis.sadd(redis_key, child_id)
|
||||||
|
redis.expire(redis_key, THREAD_FIXUP_WINDOW)
|
||||||
|
|
||||||
|
parent = ActivityPub::TagManager.instance.uri_to_resource(@parent_uri, Status)
|
||||||
|
redis.srem(redis_key, child_id) if parent.present?
|
||||||
|
|
||||||
|
parent
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def reattach_orphaned_children!(parent)
|
||||||
|
with_redis do |redis|
|
||||||
|
redis.sscan_each(redis_key, count: 1000) do |ids|
|
||||||
|
statuses = Status.where(id: ids).to_a
|
||||||
|
|
||||||
|
statuses.each { |status| status.update(thread: parent) }
|
||||||
|
|
||||||
|
# Updated statuses need to be distributed to clients/inserted in TLs
|
||||||
|
DistributionWorker.push_bulk(statuses.filter(&:within_realtime_window?)) do |status|
|
||||||
|
[status.id, { 'skip_notifications' => true }]
|
||||||
|
end
|
||||||
|
|
||||||
|
redis.srem(redis_key, ids)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def redis_key
|
||||||
|
"thread_repair:#{@parent_uri}"
|
||||||
|
end
|
||||||
|
end
|
|
@ -13,6 +13,11 @@ class ThreadResolveWorker
|
||||||
parent_status = ActivityPub::TagManager.instance.uri_to_resource(parent_url, Status)
|
parent_status = ActivityPub::TagManager.instance.uri_to_resource(parent_url, Status)
|
||||||
parent_status ||= FetchRemoteStatusService.new.call(parent_url, **options.deep_symbolize_keys)
|
parent_status ||= FetchRemoteStatusService.new.call(parent_url, **options.deep_symbolize_keys)
|
||||||
|
|
||||||
|
# The parent post does not exist or is private, so take note of it so the thread can be
|
||||||
|
# reconstructed if the parent arrives.
|
||||||
|
# This will re-check the database to avoid race conditions
|
||||||
|
parent_status ||= ThreadRepair.new(parent_url).find_parent(child_status_id)
|
||||||
|
|
||||||
return if parent_status.nil?
|
return if parent_status.nil?
|
||||||
|
|
||||||
child_status.thread = parent_status
|
child_status.thread = parent_status
|
||||||
|
|
Loading…
Reference in a new issue