1
0
Fork 0
mirror of https://github.com/mastodon/mastodon.git synced 2024-08-20 21:08:15 -07:00

Compare commits

...

5 commits

20 changed files with 187 additions and 18 deletions

View file

@ -24,6 +24,9 @@ module AccessTokenExtension
end end
def push_to_streaming_api def push_to_streaming_api
redis.publish("timeline:access_token:#{id}", Oj.dump(event: :kill)) if revoked? || destroyed? if revoked? || destroyed?
redis.publish("timeline:access_token:#{id}", Oj.dump(event: :kill))
redis.publish('system', Oj.dump(event: :terminate, access_tokens: [id]))
end
end end
end end

View file

@ -40,6 +40,8 @@ module ApplicationExtension
pipeline.publish("timeline:access_token:#{id}", payload) pipeline.publish("timeline:access_token:#{id}", payload)
end end
end end
redis.publish('system', Oj.dump(event: :terminate, access_tokens: tokens.ids))
end end
end end
end end

View file

@ -18,6 +18,8 @@ class AccountDomainBlock < ApplicationRecord
belongs_to :account belongs_to :account
validates :domain, presence: true, uniqueness: { scope: :account_id }, domain: true validates :domain, presence: true, uniqueness: { scope: :account_id }, domain: true
after_destroy :notify_streaming
after_commit :invalidate_domain_blocking_cache after_commit :invalidate_domain_blocking_cache
after_commit :invalidate_follow_recommendations_cache after_commit :invalidate_follow_recommendations_cache
@ -31,4 +33,8 @@ class AccountDomainBlock < ApplicationRecord
def invalidate_follow_recommendations_cache def invalidate_follow_recommendations_cache
Rails.cache.delete("follow_recommendations/#{account_id}") Rails.cache.delete("follow_recommendations/#{account_id}")
end end
def notify_streaming
AfterUnblockDomainFromAccountService.call(account_id, domain)
end
end end

View file

@ -114,6 +114,8 @@ class CustomFilter < ApplicationRecord
Rails.cache.delete("filters:v3:#{account_id}") Rails.cache.delete("filters:v3:#{account_id}")
redis.publish("timeline:#{account_id}", Oj.dump(event: :filters_changed)) redis.publish("timeline:#{account_id}", Oj.dump(event: :filters_changed))
redis.publish("timeline:system:#{account_id}", Oj.dump(event: :filters_changed)) redis.publish("timeline:system:#{account_id}", Oj.dump(event: :filters_changed))
redis.publish('system', Oj.dump(event: :filters_changed, account_id: account_id))
end end
private private

View file

@ -342,19 +342,21 @@ class User < ApplicationRecord
def revoke_access! def revoke_access!
Doorkeeper::AccessGrant.by_resource_owner(self).update_all(revoked_at: Time.now.utc) Doorkeeper::AccessGrant.by_resource_owner(self).update_all(revoked_at: Time.now.utc)
Doorkeeper::AccessToken.by_resource_owner(self).in_batches do |batch| Doorkeeper::AccessToken.by_resource_owner(self).in_batches do |tokens|
batch.update_all(revoked_at: Time.now.utc) tokens.update_all(revoked_at: Time.now.utc)
Web::PushSubscription.where(access_token_id: batch).delete_all Web::PushSubscription.where(access_token_id: tokens).delete_all
# Revoke each access token for the Streaming API, since `update_all`` # Revoke each access token for the Streaming API, since `update_all``
# doesn't trigger ActiveRecord Callbacks: # doesn't trigger ActiveRecord Callbacks:
# TODO: #28793 Combine into a single topic # TODO: #28793 Combine into a single topic
payload = Oj.dump(event: :kill) payload = Oj.dump(event: :kill)
redis.pipelined do |pipeline| redis.pipelined do |pipeline|
batch.ids.each do |id| tokens.ids.each do |id|
pipeline.publish("timeline:access_token:#{id}", payload) pipeline.publish("timeline:access_token:#{id}", payload)
end end
end end
redis.publish('system', Oj.dump(event: :terminate, access_tokens: tokens.ids))
end end
end end

View file

@ -2,6 +2,7 @@
class AfterBlockDomainFromAccountService < BaseService class AfterBlockDomainFromAccountService < BaseService
include Payloadable include Payloadable
include Redisable
# This service does not create an AccountDomainBlock record, # This service does not create an AccountDomainBlock record,
# it's meant to be called after such a record has been created # it's meant to be called after such a record has been created
@ -16,7 +17,9 @@ class AfterBlockDomainFromAccountService < BaseService
remove_follows! remove_follows!
reject_existing_followers! reject_existing_followers!
reject_pending_follow_requests! reject_pending_follow_requests!
notify_of_severed_relationships! notify_of_severed_relationships!
notify_streaming!
end end
private private
@ -67,4 +70,8 @@ class AfterBlockDomainFromAccountService < BaseService
def domain_block_event def domain_block_event
@domain_block_event ||= RelationshipSeveranceEvent.create!(type: :user_domain_block, target_name: @domain) @domain_block_event ||= RelationshipSeveranceEvent.create!(type: :user_domain_block, target_name: @domain)
end end
def notify_streaming!
redis.publish('system', Oj.dump(event: :domain_blocks_changed, account: @account.id, target_domain: @domain))
end
end end

View file

@ -1,6 +1,8 @@
# frozen_string_literal: true # frozen_string_literal: true
class AfterBlockService < BaseService class AfterBlockService < BaseService
include Redisable
def call(account, target_account) def call(account, target_account)
@account = account @account = account
@target_account = target_account @target_account = target_account
@ -9,6 +11,8 @@ class AfterBlockService < BaseService
clear_list_feeds! clear_list_feeds!
clear_notifications! clear_notifications!
clear_conversations! clear_conversations!
notify_streaming!
end end
private private
@ -28,4 +32,8 @@ class AfterBlockService < BaseService
def clear_notifications! def clear_notifications!
Notification.where(account: @account).where(from_account: @target_account).in_batches.delete_all Notification.where(account: @account).where(from_account: @target_account).in_batches.delete_all
end end
def notify_streaming!
redis.publish('system', Oj.dump(event: :blocks_changed, account: @account.id, target_account: @target_account.id))
end
end end

View file

@ -0,0 +1,12 @@
# frozen_string_literal: true
class AfterUnblockDomainFromAccountService < BaseService
include Redisable
# This service does not delete an AccountDomainBlock record,
# it's meant to be called after such a record has been created
# synchronously, to "clean up"
def call(account, domain)
redis.publish('system', Oj.dump(event: :domain_blocks_changed, account: account.id, target_domain: domain))
end
end

View file

@ -2,12 +2,16 @@
class UnblockService < BaseService class UnblockService < BaseService
include Payloadable include Payloadable
include Redisable
def call(account, target_account) def call(account, target_account)
return unless account.blocking?(target_account) return unless account.blocking?(target_account)
unblock = account.unblock!(target_account) unblock = account.unblock!(target_account)
create_notification(unblock) if !target_account.local? && target_account.activitypub? create_notification(unblock) if !target_account.local? && target_account.activitypub?
redis.publish('system', Oj.dump(event: :blocks_changed, account: account.id, target_account: target_account.id))
unblock unblock
end end

View file

@ -1,11 +1,15 @@
# frozen_string_literal: true # frozen_string_literal: true
class UnmuteService < BaseService class UnmuteService < BaseService
include Redisable
def call(account, target_account) def call(account, target_account)
return unless account.muting?(target_account) return unless account.muting?(target_account)
account.unmute!(target_account) account.unmute!(target_account)
MergeWorker.perform_async(target_account.id, account.id) if account.following?(target_account) MergeWorker.perform_async(target_account.id, account.id) if account.following?(target_account)
redis.publish('system', Oj.dump(event: :mutes_changed, account: account.id, target_account: target_account.id))
end end
end end

View file

@ -2,10 +2,25 @@
class MuteWorker class MuteWorker
include Sidekiq::Worker include Sidekiq::Worker
include Redisable
def perform(account_id, target_account_id) def perform(account_id, target_account_id)
FeedManager.instance.clear_from_home(Account.find(account_id), Account.find(target_account_id)) @account = Account.find(account_id)
@target_account = Account.find(target_account_id)
clear_home_feed!
notify_streaming!
rescue ActiveRecord::RecordNotFound rescue ActiveRecord::RecordNotFound
true true
end end
private
def clear_home_feed!
FeedManager.instance.clear_from_home(@account, @target_account)
end
def notify_streaming!
redis.publish('system', Oj.dump(event: :mutes_changed, account: @account.id, target_account: @target_account.id))
end
end end

View file

@ -48,13 +48,16 @@ describe Oauth::AuthorizedApplicationsController do
describe 'DELETE #destroy' do describe 'DELETE #destroy' do
let!(:user) { Fabricate(:user) } let!(:user) { Fabricate(:user) }
let!(:application) { Fabricate(:application) } let!(:application) { Fabricate(:application) }
let!(:access_token) { Fabricate(:accessible_access_token, application: application, resource_owner_id: user.id) } let!(:access_tokens) { Fabricate.times(3, :accessible_access_token, application: application, resource_owner_id: user.id) }
let!(:web_push_subscription) { Fabricate(:web_push_subscription, user: user, access_token: access_token) } let!(:web_push_subscription) { Fabricate(:web_push_subscription, user: user, access_token: access_tokens[0]) }
let(:redis_pipeline_stub) { instance_double(Redis::Namespace, publish: nil) } let(:redis_pipeline_stub) { instance_double(Redis::Namespace, publish: nil) }
before do before do
sign_in user, scope: :user sign_in user, scope: :user
allow(redis).to receive(:pipelined).and_yield(redis_pipeline_stub) allow(redis).to receive(:pipelined).and_yield(redis_pipeline_stub)
allow(redis).to receive(:publish)
post :destroy, params: { id: application.id } post :destroy, params: { id: application.id }
end end
@ -71,7 +74,11 @@ describe Oauth::AuthorizedApplicationsController do
end end
it 'sends a session kill payload to the streaming server' do it 'sends a session kill payload to the streaming server' do
expect(redis_pipeline_stub).to have_received(:publish).with("timeline:access_token:#{access_token.id}", '{"event":"kill"}') access_tokens.each do |access_token|
expect(redis_pipeline_stub).to have_received(:publish).with("timeline:access_token:#{access_token.id}", '{"event":"kill"}')
end
expect(redis).to have_received(:publish).with('system', Oj.dump(event: :terminate, access_tokens: access_tokens.map(&:id)))
end end
end end
end end

View file

@ -148,10 +148,12 @@ describe Settings::ApplicationsController do
describe 'destroy' do describe 'destroy' do
let(:redis_pipeline_stub) { instance_double(Redis::Namespace, publish: nil) } let(:redis_pipeline_stub) { instance_double(Redis::Namespace, publish: nil) }
let!(:access_token) { Fabricate(:accessible_access_token, application: app) } let!(:access_tokens) { Fabricate.times(3, :accessible_access_token, application: app) }
before do before do
allow(redis).to receive(:pipelined).and_yield(redis_pipeline_stub) allow(redis).to receive(:pipelined).and_yield(redis_pipeline_stub)
allow(redis).to receive(:publish)
post :destroy, params: { id: app.id } post :destroy, params: { id: app.id }
end end
@ -161,7 +163,11 @@ describe Settings::ApplicationsController do
end end
it 'sends a session kill payload to the streaming server' do it 'sends a session kill payload to the streaming server' do
expect(redis_pipeline_stub).to have_received(:publish).with("timeline:access_token:#{access_token.id}", '{"event":"kill"}') access_tokens.each do |access_token|
expect(redis_pipeline_stub).to have_received(:publish).with("timeline:access_token:#{access_token.id}", '{"event":"kill"}')
end
expect(redis).to have_received(:publish).with('system', Oj.dump({ event: :terminate, access_tokens: access_tokens.map(&:id) }))
end end
end end

View file

@ -466,13 +466,14 @@ RSpec.describe User do
subject(:user) { Fabricate(:user, password: 'foobar12345') } subject(:user) { Fabricate(:user, password: 'foobar12345') }
let!(:session_activation) { Fabricate(:session_activation, user: user) } let!(:session_activation) { Fabricate(:session_activation, user: user) }
let!(:access_token) { Fabricate(:access_token, resource_owner_id: user.id) } let!(:access_tokens) { Fabricate.times(3, :access_token, resource_owner_id: user.id) }
let!(:web_push_subscription) { Fabricate(:web_push_subscription, access_token: access_token) } let!(:web_push_subscription) { Fabricate(:web_push_subscription, access_token: access_tokens[0]) }
let(:redis_pipeline_stub) { instance_double(Redis::Namespace, publish: nil) } let(:redis_pipeline_stub) { instance_double(Redis::Namespace, publish: nil) }
before do before do
allow(redis).to receive(:pipelined).and_yield(redis_pipeline_stub) allow(redis).to receive(:pipelined).and_yield(redis_pipeline_stub)
allow(redis).to receive(:publish)
user.reset_password! user.reset_password!
end end
@ -490,11 +491,15 @@ RSpec.describe User do
end end
it 'revokes streaming access for all access tokens' do it 'revokes streaming access for all access tokens' do
expect(redis_pipeline_stub).to have_received(:publish).with("timeline:access_token:#{access_token.id}", Oj.dump(event: :kill)).once access_tokens.each do |access_token|
expect(redis_pipeline_stub).to have_received(:publish).with("timeline:access_token:#{access_token.id}", Oj.dump(event: :kill)).once
end
expect(redis).to have_received(:publish).with('system', Oj.dump(event: :terminate, access_tokens: access_tokens.map(&:id))).once
end end
it 'removes push subscriptions' do it 'removes push subscriptions' do
expect(Web::PushSubscription.where(user: user).or(Web::PushSubscription.where(access_token: access_token)).count).to eq 0 expect(Web::PushSubscription.where(user: user).or(Web::PushSubscription.where(access_token: access_tokens[0])).count).to eq 0
expect { web_push_subscription.reload }.to raise_error(ActiveRecord::RecordNotFound) expect { web_push_subscription.reload }.to raise_error(ActiveRecord::RecordNotFound)
end end
end end

View file

@ -199,6 +199,7 @@ RSpec.describe 'Filters' do
subject subject
expect(redis).to have_received(:publish).with("timeline:#{user.account.id}", Oj.dump(event: :filters_changed)).once expect(redis).to have_received(:publish).with("timeline:#{user.account.id}", Oj.dump(event: :filters_changed)).once
expect(redis).to have_received(:publish).with('system', Oj.dump(event: :filters_changed, account_id: user.account.id)).once
end end
end end

View file

@ -5,8 +5,9 @@ require 'rails_helper'
RSpec.describe AfterBlockDomainFromAccountService do RSpec.describe AfterBlockDomainFromAccountService do
subject { described_class.new } subject { described_class.new }
let(:wolf) { Fabricate(:account, username: 'wolf', domain: 'evil.org', inbox_url: 'https://evil.org/wolf/inbox', protocol: :activitypub) } let(:target_domain) { 'evil.org' }
let(:dog) { Fabricate(:account, username: 'dog', domain: 'evil.org', inbox_url: 'https://evil.org/dog/inbox', protocol: :activitypub) } let(:wolf) { Fabricate(:account, username: 'wolf', domain: target_domain, inbox_url: 'https://evil.org/wolf/inbox', protocol: :activitypub) }
let(:dog) { Fabricate(:account, username: 'dog', domain: target_domain, inbox_url: 'https://evil.org/dog/inbox', protocol: :activitypub) }
let(:alice) { Fabricate(:account, username: 'alice') } let(:alice) { Fabricate(:account, username: 'alice') }
before do before do
@ -17,7 +18,7 @@ RSpec.describe AfterBlockDomainFromAccountService do
end end
it 'purge followers from blocked domain, remove notification permissions, sends `Reject->Follow`, and records severed relationships', :aggregate_failures do it 'purge followers from blocked domain, remove notification permissions, sends `Reject->Follow`, and records severed relationships', :aggregate_failures do
expect { subject.call(alice, 'evil.org') } expect { subject.call(alice, target_domain) }
.to change { wolf.following?(alice) }.from(true).to(false) .to change { wolf.following?(alice) }.from(true).to(false)
.and change { NotificationPermission.exists?(account: alice, from_account: wolf) }.from(true).to(false) .and change { NotificationPermission.exists?(account: alice, from_account: wolf) }.from(true).to(false)
@ -31,4 +32,16 @@ RSpec.describe AfterBlockDomainFromAccountService do
expect(severed_relationships[0].relationship_severance_event).to eq severed_relationships[1].relationship_severance_event expect(severed_relationships[0].relationship_severance_event).to eq severed_relationships[1].relationship_severance_event
expect(severed_relationships.map { |rel| [rel.account, rel.target_account] }).to contain_exactly([wolf, alice], [alice, dog]) expect(severed_relationships.map { |rel| [rel.account, rel.target_account] }).to contain_exactly([wolf, alice], [alice, dog])
end end
describe 'streaming integration' do
before do
allow(redis).to receive(:publish)
end
it 'notifies streaming of the domain blocks change' do
subject.call(alice, target_domain)
expect(redis).to have_received(:publish).with('system', Oj.dump(event: :domain_blocks_changed, account: alice.id, target_domain: target_domain)).once
end
end
end end

View file

@ -48,4 +48,16 @@ RSpec.describe AfterBlockService do
}.from([status.id.to_s, other_account_status.id.to_s, other_account_reblog.id.to_s]).to([other_account_status.id.to_s]) }.from([status.id.to_s, other_account_status.id.to_s, other_account_reblog.id.to_s]).to([other_account_status.id.to_s])
end end
end end
describe 'streaming integration' do
before do
allow(redis).to receive(:publish)
end
it 'notifies streaming of the blocks change' do
subject
expect(redis).to have_received(:publish).with('system', Oj.dump(event: :blocks_changed, account: account.id, target_account: target_account.id)).once
end
end
end end

View file

@ -7,6 +7,10 @@ RSpec.describe UnblockService do
let(:sender) { Fabricate(:account, username: 'alice') } let(:sender) { Fabricate(:account, username: 'alice') }
before do
allow(redis).to receive(:publish)
end
describe 'local' do describe 'local' do
let(:bob) { Fabricate(:account) } let(:bob) { Fabricate(:account) }
@ -18,6 +22,10 @@ RSpec.describe UnblockService do
it 'destroys the blocking relation' do it 'destroys the blocking relation' do
expect(sender.blocking?(bob)).to be false expect(sender.blocking?(bob)).to be false
end end
it 'notifies streaming of the change in blocks' do
expect(redis).to have_received(:publish).with('system', Oj.dump(event: :blocks_changed, account: sender.id, target_account: bob.id))
end
end end
describe 'remote ActivityPub' do describe 'remote ActivityPub' do
@ -36,5 +44,9 @@ RSpec.describe UnblockService do
it 'sends an unblock activity', :inline_jobs do it 'sends an unblock activity', :inline_jobs do
expect(a_request(:post, 'http://example.com/inbox')).to have_been_made.once expect(a_request(:post, 'http://example.com/inbox')).to have_been_made.once
end end
it 'notifies streaming of the change in blocks' do
expect(redis).to have_received(:publish).with('system', Oj.dump(event: :blocks_changed, account: sender.id, target_account: bob.id))
end
end end
end end

View file

@ -7,6 +7,10 @@ RSpec.describe UnmuteService do
let!(:account) { Fabricate(:account) } let!(:account) { Fabricate(:account) }
let!(:target_account) { Fabricate(:account) } let!(:target_account) { Fabricate(:account) }
before do
allow(redis).to receive(:publish)
end
context 'when account is muting target account' do context 'when account is muting target account' do
before { Fabricate :mute, account: account, target_account: target_account } before { Fabricate :mute, account: account, target_account: target_account }
@ -33,6 +37,12 @@ RSpec.describe UnmuteService do
.from(true) .from(true)
.to(false) .to(false)
end end
it 'notifies streaming of the change in mutes' do
subject.call(account, target_account)
expect(redis).to have_received(:publish).with('system', Oj.dump(event: :mutes_changed, account: account.id, target_account: target_account.id))
end
end end
context 'when account is not muting target account' do context 'when account is not muting target account' do
@ -40,6 +50,8 @@ RSpec.describe UnmuteService do
expect { subject.call(account, target_account) } expect { subject.call(account, target_account) }
.to_not(change { account.reload.muting?(target_account) }) .to_not(change { account.reload.muting?(target_account) })
expect(MergeWorker).to_not have_enqueued_sidekiq_job(any_args) expect(MergeWorker).to_not have_enqueued_sidekiq_job(any_args)
expect(redis).to_not have_received(:publish)
end end
end end
end end

View file

@ -0,0 +1,36 @@
# frozen_string_literal: true
require 'rails_helper'
describe MuteWorker do
subject { described_class.new.perform(account.id, target_account.id) }
let(:account) { Fabricate(:account) }
let(:target_account) { Fabricate(:account) }
describe '#perform' do
describe 'home timeline' do
before do
allow(FeedManager.instance).to receive(:clear_from_home)
end
it "clears target account's statuses" do
subject
expect(FeedManager.instance).to have_received(:clear_from_home).with(account, target_account)
end
end
describe 'streaming integration' do
before do
allow(redis).to receive(:publish)
end
it 'notifies streaming of the change in mutes' do
subject
expect(redis).to have_received(:publish).with('system', Oj.dump(event: :mutes_changed, account: account.id, target_account: target_account.id))
end
end
end
end