diff --git a/app/lib/access_token_extension.rb b/app/lib/access_token_extension.rb index 6e06f988a5e..7313330b98b 100644 --- a/app/lib/access_token_extension.rb +++ b/app/lib/access_token_extension.rb @@ -24,6 +24,9 @@ module AccessTokenExtension end def push_to_streaming_api - redis.publish("timeline:access_token:#{id}", Oj.dump(event: :kill)) if revoked? || destroyed? + if revoked? || destroyed? + redis.publish("timeline:access_token:#{id}", Oj.dump(event: :kill)) + redis.publish('system', Oj.dump(event: :terminate, access_tokens: [id])) + end end end diff --git a/app/lib/application_extension.rb b/app/lib/application_extension.rb index d7aaeba5bde..761d2ff7525 100644 --- a/app/lib/application_extension.rb +++ b/app/lib/application_extension.rb @@ -40,6 +40,8 @@ module ApplicationExtension pipeline.publish("timeline:access_token:#{id}", payload) end end + + redis.publish('system', Oj.dump(event: :terminate, access_tokens: tokens.ids)) end end end diff --git a/app/models/account_domain_block.rb b/app/models/account_domain_block.rb index 753935d6af6..0614ce6c1d4 100644 --- a/app/models/account_domain_block.rb +++ b/app/models/account_domain_block.rb @@ -18,6 +18,8 @@ class AccountDomainBlock < ApplicationRecord belongs_to :account validates :domain, presence: true, uniqueness: { scope: :account_id }, domain: true + after_destroy :notify_streaming + after_commit :invalidate_domain_blocking_cache after_commit :invalidate_follow_recommendations_cache @@ -31,4 +33,8 @@ class AccountDomainBlock < ApplicationRecord def invalidate_follow_recommendations_cache Rails.cache.delete("follow_recommendations/#{account_id}") end + + def notify_streaming + AfterUnblockDomainFromAccountService.call(account_id, domain) + end end diff --git a/app/models/custom_filter.rb b/app/models/custom_filter.rb index bacf1582618..30cc4f5abc2 100644 --- a/app/models/custom_filter.rb +++ b/app/models/custom_filter.rb @@ -114,6 +114,8 @@ class CustomFilter < ApplicationRecord Rails.cache.delete("filters:v3:#{account_id}") redis.publish("timeline:#{account_id}", Oj.dump(event: :filters_changed)) redis.publish("timeline:system:#{account_id}", Oj.dump(event: :filters_changed)) + + redis.publish('system', Oj.dump(event: :filters_changed, account_id: account_id)) end private diff --git a/app/models/user.rb b/app/models/user.rb index 72854569260..72a3577aa01 100644 --- a/app/models/user.rb +++ b/app/models/user.rb @@ -342,19 +342,21 @@ class User < ApplicationRecord def revoke_access! Doorkeeper::AccessGrant.by_resource_owner(self).update_all(revoked_at: Time.now.utc) - Doorkeeper::AccessToken.by_resource_owner(self).in_batches do |batch| - batch.update_all(revoked_at: Time.now.utc) - Web::PushSubscription.where(access_token_id: batch).delete_all + Doorkeeper::AccessToken.by_resource_owner(self).in_batches do |tokens| + tokens.update_all(revoked_at: Time.now.utc) + Web::PushSubscription.where(access_token_id: tokens).delete_all # Revoke each access token for the Streaming API, since `update_all`` # doesn't trigger ActiveRecord Callbacks: # TODO: #28793 Combine into a single topic payload = Oj.dump(event: :kill) redis.pipelined do |pipeline| - batch.ids.each do |id| + tokens.ids.each do |id| pipeline.publish("timeline:access_token:#{id}", payload) end end + + redis.publish('system', Oj.dump(event: :terminate, access_tokens: tokens.ids)) end end diff --git a/app/services/after_block_domain_from_account_service.rb b/app/services/after_block_domain_from_account_service.rb index fc5dc656810..3f611508229 100644 --- a/app/services/after_block_domain_from_account_service.rb +++ b/app/services/after_block_domain_from_account_service.rb @@ -2,6 +2,7 @@ class AfterBlockDomainFromAccountService < BaseService include Payloadable + include Redisable # This service does not create an AccountDomainBlock record, # it's meant to be called after such a record has been created @@ -16,7 +17,9 @@ class AfterBlockDomainFromAccountService < BaseService remove_follows! reject_existing_followers! reject_pending_follow_requests! + notify_of_severed_relationships! + notify_streaming! end private @@ -67,4 +70,8 @@ class AfterBlockDomainFromAccountService < BaseService def domain_block_event @domain_block_event ||= RelationshipSeveranceEvent.create!(type: :user_domain_block, target_name: @domain) end + + def notify_streaming! + redis.publish('system', Oj.dump(event: :domain_blocks_changed, account: @account.id, target_domain: @domain)) + end end diff --git a/app/services/after_block_service.rb b/app/services/after_block_service.rb index 899e84be44f..6f4c8a0b45f 100644 --- a/app/services/after_block_service.rb +++ b/app/services/after_block_service.rb @@ -1,6 +1,8 @@ # frozen_string_literal: true class AfterBlockService < BaseService + include Redisable + def call(account, target_account) @account = account @target_account = target_account @@ -9,6 +11,8 @@ class AfterBlockService < BaseService clear_list_feeds! clear_notifications! clear_conversations! + + notify_streaming! end private @@ -28,4 +32,8 @@ class AfterBlockService < BaseService def clear_notifications! Notification.where(account: @account).where(from_account: @target_account).in_batches.delete_all end + + def notify_streaming! + redis.publish('system', Oj.dump(event: :blocks_changed, account: @account.id, target_account: @target_account.id)) + end end diff --git a/app/services/after_unblock_domain_from_account_service.rb b/app/services/after_unblock_domain_from_account_service.rb new file mode 100644 index 00000000000..351207c8bee --- /dev/null +++ b/app/services/after_unblock_domain_from_account_service.rb @@ -0,0 +1,12 @@ +# frozen_string_literal: true + +class AfterUnblockDomainFromAccountService < BaseService + include Redisable + + # This service does not delete an AccountDomainBlock record, + # it's meant to be called after such a record has been created + # synchronously, to "clean up" + def call(account, domain) + redis.publish('system', Oj.dump(event: :domain_blocks_changed, account: account.id, target_domain: domain)) + end +end diff --git a/app/services/unblock_service.rb b/app/services/unblock_service.rb index c263ac8afe0..c028c2d5fc6 100644 --- a/app/services/unblock_service.rb +++ b/app/services/unblock_service.rb @@ -2,12 +2,16 @@ class UnblockService < BaseService include Payloadable + include Redisable def call(account, target_account) return unless account.blocking?(target_account) unblock = account.unblock!(target_account) create_notification(unblock) if !target_account.local? && target_account.activitypub? + + redis.publish('system', Oj.dump(event: :blocks_changed, account: account.id, target_account: target_account.id)) + unblock end diff --git a/app/services/unmute_service.rb b/app/services/unmute_service.rb index 6aeea358f75..52c1b8f9236 100644 --- a/app/services/unmute_service.rb +++ b/app/services/unmute_service.rb @@ -1,11 +1,15 @@ # frozen_string_literal: true class UnmuteService < BaseService + include Redisable + def call(account, target_account) return unless account.muting?(target_account) account.unmute!(target_account) MergeWorker.perform_async(target_account.id, account.id) if account.following?(target_account) + + redis.publish('system', Oj.dump(event: :mutes_changed, account: account.id, target_account: target_account.id)) end end diff --git a/app/workers/mute_worker.rb b/app/workers/mute_worker.rb index c74f657cbac..47aeaf1990f 100644 --- a/app/workers/mute_worker.rb +++ b/app/workers/mute_worker.rb @@ -2,10 +2,25 @@ class MuteWorker include Sidekiq::Worker + include Redisable def perform(account_id, target_account_id) - FeedManager.instance.clear_from_home(Account.find(account_id), Account.find(target_account_id)) + @account = Account.find(account_id) + @target_account = Account.find(target_account_id) + + clear_home_feed! + notify_streaming! rescue ActiveRecord::RecordNotFound true end + + private + + def clear_home_feed! + FeedManager.instance.clear_from_home(@account, @target_account) + end + + def notify_streaming! + redis.publish('system', Oj.dump(event: :mutes_changed, account: @account.id, target_account: @target_account.id)) + end end diff --git a/spec/controllers/oauth/authorized_applications_controller_spec.rb b/spec/controllers/oauth/authorized_applications_controller_spec.rb index 3fd9f9499f4..2eebface625 100644 --- a/spec/controllers/oauth/authorized_applications_controller_spec.rb +++ b/spec/controllers/oauth/authorized_applications_controller_spec.rb @@ -48,13 +48,16 @@ describe Oauth::AuthorizedApplicationsController do describe 'DELETE #destroy' do let!(:user) { Fabricate(:user) } let!(:application) { Fabricate(:application) } - let!(:access_token) { Fabricate(:accessible_access_token, application: application, resource_owner_id: user.id) } - let!(:web_push_subscription) { Fabricate(:web_push_subscription, user: user, access_token: access_token) } + let!(:access_tokens) { Fabricate.times(3, :accessible_access_token, application: application, resource_owner_id: user.id) } + let!(:web_push_subscription) { Fabricate(:web_push_subscription, user: user, access_token: access_tokens[0]) } let(:redis_pipeline_stub) { instance_double(Redis::Namespace, publish: nil) } before do sign_in user, scope: :user + allow(redis).to receive(:pipelined).and_yield(redis_pipeline_stub) + allow(redis).to receive(:publish) + post :destroy, params: { id: application.id } end @@ -71,7 +74,11 @@ describe Oauth::AuthorizedApplicationsController do end it 'sends a session kill payload to the streaming server' do - expect(redis_pipeline_stub).to have_received(:publish).with("timeline:access_token:#{access_token.id}", '{"event":"kill"}') + access_tokens.each do |access_token| + expect(redis_pipeline_stub).to have_received(:publish).with("timeline:access_token:#{access_token.id}", '{"event":"kill"}') + end + + expect(redis).to have_received(:publish).with('system', Oj.dump(event: :terminate, access_tokens: access_tokens.map(&:id))) end end end diff --git a/spec/controllers/settings/applications_controller_spec.rb b/spec/controllers/settings/applications_controller_spec.rb index ce2e0749a76..f5f2a7209fd 100644 --- a/spec/controllers/settings/applications_controller_spec.rb +++ b/spec/controllers/settings/applications_controller_spec.rb @@ -148,10 +148,12 @@ describe Settings::ApplicationsController do describe 'destroy' do let(:redis_pipeline_stub) { instance_double(Redis::Namespace, publish: nil) } - let!(:access_token) { Fabricate(:accessible_access_token, application: app) } + let!(:access_tokens) { Fabricate.times(3, :accessible_access_token, application: app) } before do allow(redis).to receive(:pipelined).and_yield(redis_pipeline_stub) + allow(redis).to receive(:publish) + post :destroy, params: { id: app.id } end @@ -161,7 +163,11 @@ describe Settings::ApplicationsController do end it 'sends a session kill payload to the streaming server' do - expect(redis_pipeline_stub).to have_received(:publish).with("timeline:access_token:#{access_token.id}", '{"event":"kill"}') + access_tokens.each do |access_token| + expect(redis_pipeline_stub).to have_received(:publish).with("timeline:access_token:#{access_token.id}", '{"event":"kill"}') + end + + expect(redis).to have_received(:publish).with('system', Oj.dump({ event: :terminate, access_tokens: access_tokens.map(&:id) })) end end diff --git a/spec/models/user_spec.rb b/spec/models/user_spec.rb index 4755500fc4e..a64e5f3374d 100644 --- a/spec/models/user_spec.rb +++ b/spec/models/user_spec.rb @@ -466,13 +466,14 @@ RSpec.describe User do subject(:user) { Fabricate(:user, password: 'foobar12345') } let!(:session_activation) { Fabricate(:session_activation, user: user) } - let!(:access_token) { Fabricate(:access_token, resource_owner_id: user.id) } - let!(:web_push_subscription) { Fabricate(:web_push_subscription, access_token: access_token) } + let!(:access_tokens) { Fabricate.times(3, :access_token, resource_owner_id: user.id) } + let!(:web_push_subscription) { Fabricate(:web_push_subscription, access_token: access_tokens[0]) } let(:redis_pipeline_stub) { instance_double(Redis::Namespace, publish: nil) } before do allow(redis).to receive(:pipelined).and_yield(redis_pipeline_stub) + allow(redis).to receive(:publish) user.reset_password! end @@ -490,11 +491,15 @@ RSpec.describe User do end it 'revokes streaming access for all access tokens' do - expect(redis_pipeline_stub).to have_received(:publish).with("timeline:access_token:#{access_token.id}", Oj.dump(event: :kill)).once + access_tokens.each do |access_token| + expect(redis_pipeline_stub).to have_received(:publish).with("timeline:access_token:#{access_token.id}", Oj.dump(event: :kill)).once + end + + expect(redis).to have_received(:publish).with('system', Oj.dump(event: :terminate, access_tokens: access_tokens.map(&:id))).once end it 'removes push subscriptions' do - expect(Web::PushSubscription.where(user: user).or(Web::PushSubscription.where(access_token: access_token)).count).to eq 0 + expect(Web::PushSubscription.where(user: user).or(Web::PushSubscription.where(access_token: access_tokens[0])).count).to eq 0 expect { web_push_subscription.reload }.to raise_error(ActiveRecord::RecordNotFound) end end diff --git a/spec/requests/api/v2/filters_spec.rb b/spec/requests/api/v2/filters_spec.rb index fd0483abbe5..3de0157202d 100644 --- a/spec/requests/api/v2/filters_spec.rb +++ b/spec/requests/api/v2/filters_spec.rb @@ -199,6 +199,7 @@ RSpec.describe 'Filters' do subject expect(redis).to have_received(:publish).with("timeline:#{user.account.id}", Oj.dump(event: :filters_changed)).once + expect(redis).to have_received(:publish).with('system', Oj.dump(event: :filters_changed, account_id: user.account.id)).once end end diff --git a/spec/services/after_block_domain_from_account_service_spec.rb b/spec/services/after_block_domain_from_account_service_spec.rb index 248648a809e..31a153d1184 100644 --- a/spec/services/after_block_domain_from_account_service_spec.rb +++ b/spec/services/after_block_domain_from_account_service_spec.rb @@ -5,8 +5,9 @@ require 'rails_helper' RSpec.describe AfterBlockDomainFromAccountService do subject { described_class.new } - let(:wolf) { Fabricate(:account, username: 'wolf', domain: 'evil.org', inbox_url: 'https://evil.org/wolf/inbox', protocol: :activitypub) } - let(:dog) { Fabricate(:account, username: 'dog', domain: 'evil.org', inbox_url: 'https://evil.org/dog/inbox', protocol: :activitypub) } + let(:target_domain) { 'evil.org' } + let(:wolf) { Fabricate(:account, username: 'wolf', domain: target_domain, inbox_url: 'https://evil.org/wolf/inbox', protocol: :activitypub) } + let(:dog) { Fabricate(:account, username: 'dog', domain: target_domain, inbox_url: 'https://evil.org/dog/inbox', protocol: :activitypub) } let(:alice) { Fabricate(:account, username: 'alice') } before do @@ -17,7 +18,7 @@ RSpec.describe AfterBlockDomainFromAccountService do end it 'purge followers from blocked domain, remove notification permissions, sends `Reject->Follow`, and records severed relationships', :aggregate_failures do - expect { subject.call(alice, 'evil.org') } + expect { subject.call(alice, target_domain) } .to change { wolf.following?(alice) }.from(true).to(false) .and change { NotificationPermission.exists?(account: alice, from_account: wolf) }.from(true).to(false) @@ -31,4 +32,16 @@ RSpec.describe AfterBlockDomainFromAccountService do expect(severed_relationships[0].relationship_severance_event).to eq severed_relationships[1].relationship_severance_event expect(severed_relationships.map { |rel| [rel.account, rel.target_account] }).to contain_exactly([wolf, alice], [alice, dog]) end + + describe 'streaming integration' do + before do + allow(redis).to receive(:publish) + end + + it 'notifies streaming of the domain blocks change' do + subject.call(alice, target_domain) + + expect(redis).to have_received(:publish).with('system', Oj.dump(event: :domain_blocks_changed, account: alice.id, target_domain: target_domain)).once + end + end end diff --git a/spec/services/after_block_service_spec.rb b/spec/services/after_block_service_spec.rb index 82825dad982..6ddfb0de5bf 100644 --- a/spec/services/after_block_service_spec.rb +++ b/spec/services/after_block_service_spec.rb @@ -48,4 +48,16 @@ RSpec.describe AfterBlockService do }.from([status.id.to_s, other_account_status.id.to_s, other_account_reblog.id.to_s]).to([other_account_status.id.to_s]) end end + + describe 'streaming integration' do + before do + allow(redis).to receive(:publish) + end + + it 'notifies streaming of the blocks change' do + subject + + expect(redis).to have_received(:publish).with('system', Oj.dump(event: :blocks_changed, account: account.id, target_account: target_account.id)).once + end + end end diff --git a/spec/services/unblock_service_spec.rb b/spec/services/unblock_service_spec.rb index 6132e74415e..453d286fc5a 100644 --- a/spec/services/unblock_service_spec.rb +++ b/spec/services/unblock_service_spec.rb @@ -7,6 +7,10 @@ RSpec.describe UnblockService do let(:sender) { Fabricate(:account, username: 'alice') } + before do + allow(redis).to receive(:publish) + end + describe 'local' do let(:bob) { Fabricate(:account) } @@ -18,6 +22,10 @@ RSpec.describe UnblockService do it 'destroys the blocking relation' do expect(sender.blocking?(bob)).to be false end + + it 'notifies streaming of the change in blocks' do + expect(redis).to have_received(:publish).with('system', Oj.dump(event: :blocks_changed, account: sender.id, target_account: bob.id)) + end end describe 'remote ActivityPub' do @@ -36,5 +44,9 @@ RSpec.describe UnblockService do it 'sends an unblock activity', :inline_jobs do expect(a_request(:post, 'http://example.com/inbox')).to have_been_made.once end + + it 'notifies streaming of the change in blocks' do + expect(redis).to have_received(:publish).with('system', Oj.dump(event: :blocks_changed, account: sender.id, target_account: bob.id)) + end end end diff --git a/spec/services/unmute_service_spec.rb b/spec/services/unmute_service_spec.rb index 92c7a70d65d..834d86cd266 100644 --- a/spec/services/unmute_service_spec.rb +++ b/spec/services/unmute_service_spec.rb @@ -7,6 +7,10 @@ RSpec.describe UnmuteService do let!(:account) { Fabricate(:account) } let!(:target_account) { Fabricate(:account) } + before do + allow(redis).to receive(:publish) + end + context 'when account is muting target account' do before { Fabricate :mute, account: account, target_account: target_account } @@ -33,6 +37,12 @@ RSpec.describe UnmuteService do .from(true) .to(false) end + + it 'notifies streaming of the change in mutes' do + subject.call(account, target_account) + + expect(redis).to have_received(:publish).with('system', Oj.dump(event: :mutes_changed, account: account.id, target_account: target_account.id)) + end end context 'when account is not muting target account' do @@ -40,6 +50,8 @@ RSpec.describe UnmuteService do expect { subject.call(account, target_account) } .to_not(change { account.reload.muting?(target_account) }) expect(MergeWorker).to_not have_enqueued_sidekiq_job(any_args) + + expect(redis).to_not have_received(:publish) end end end diff --git a/spec/workers/mute_worker_spec.rb b/spec/workers/mute_worker_spec.rb new file mode 100644 index 00000000000..c49cfc90910 --- /dev/null +++ b/spec/workers/mute_worker_spec.rb @@ -0,0 +1,36 @@ +# frozen_string_literal: true + +require 'rails_helper' + +describe MuteWorker do + subject { described_class.new.perform(account.id, target_account.id) } + + let(:account) { Fabricate(:account) } + let(:target_account) { Fabricate(:account) } + + describe '#perform' do + describe 'home timeline' do + before do + allow(FeedManager.instance).to receive(:clear_from_home) + end + + it "clears target account's statuses" do + subject + + expect(FeedManager.instance).to have_received(:clear_from_home).with(account, target_account) + end + end + + describe 'streaming integration' do + before do + allow(redis).to receive(:publish) + end + + it 'notifies streaming of the change in mutes' do + subject + + expect(redis).to have_received(:publish).with('system', Oj.dump(event: :mutes_changed, account: account.id, target_account: target_account.id)) + end + end + end +end