1
0
Fork 0
mirror of https://github.com/mastodon/mastodon.git synced 2024-08-20 21:08:15 -07:00

Compare commits

...

5 commits

20 changed files with 187 additions and 18 deletions

View file

@ -24,6 +24,9 @@ module AccessTokenExtension
end
def push_to_streaming_api
redis.publish("timeline:access_token:#{id}", Oj.dump(event: :kill)) if revoked? || destroyed?
if revoked? || destroyed?
redis.publish("timeline:access_token:#{id}", Oj.dump(event: :kill))
redis.publish('system', Oj.dump(event: :terminate, access_tokens: [id]))
end
end
end

View file

@ -40,6 +40,8 @@ module ApplicationExtension
pipeline.publish("timeline:access_token:#{id}", payload)
end
end
redis.publish('system', Oj.dump(event: :terminate, access_tokens: tokens.ids))
end
end
end

View file

@ -18,6 +18,8 @@ class AccountDomainBlock < ApplicationRecord
belongs_to :account
validates :domain, presence: true, uniqueness: { scope: :account_id }, domain: true
after_destroy :notify_streaming
after_commit :invalidate_domain_blocking_cache
after_commit :invalidate_follow_recommendations_cache
@ -31,4 +33,8 @@ class AccountDomainBlock < ApplicationRecord
def invalidate_follow_recommendations_cache
Rails.cache.delete("follow_recommendations/#{account_id}")
end
def notify_streaming
AfterUnblockDomainFromAccountService.call(account_id, domain)
end
end

View file

@ -114,6 +114,8 @@ class CustomFilter < ApplicationRecord
Rails.cache.delete("filters:v3:#{account_id}")
redis.publish("timeline:#{account_id}", Oj.dump(event: :filters_changed))
redis.publish("timeline:system:#{account_id}", Oj.dump(event: :filters_changed))
redis.publish('system', Oj.dump(event: :filters_changed, account_id: account_id))
end
private

View file

@ -342,19 +342,21 @@ class User < ApplicationRecord
def revoke_access!
Doorkeeper::AccessGrant.by_resource_owner(self).update_all(revoked_at: Time.now.utc)
Doorkeeper::AccessToken.by_resource_owner(self).in_batches do |batch|
batch.update_all(revoked_at: Time.now.utc)
Web::PushSubscription.where(access_token_id: batch).delete_all
Doorkeeper::AccessToken.by_resource_owner(self).in_batches do |tokens|
tokens.update_all(revoked_at: Time.now.utc)
Web::PushSubscription.where(access_token_id: tokens).delete_all
# Revoke each access token for the Streaming API, since `update_all``
# doesn't trigger ActiveRecord Callbacks:
# TODO: #28793 Combine into a single topic
payload = Oj.dump(event: :kill)
redis.pipelined do |pipeline|
batch.ids.each do |id|
tokens.ids.each do |id|
pipeline.publish("timeline:access_token:#{id}", payload)
end
end
redis.publish('system', Oj.dump(event: :terminate, access_tokens: tokens.ids))
end
end

View file

@ -2,6 +2,7 @@
class AfterBlockDomainFromAccountService < BaseService
include Payloadable
include Redisable
# This service does not create an AccountDomainBlock record,
# it's meant to be called after such a record has been created
@ -16,7 +17,9 @@ class AfterBlockDomainFromAccountService < BaseService
remove_follows!
reject_existing_followers!
reject_pending_follow_requests!
notify_of_severed_relationships!
notify_streaming!
end
private
@ -67,4 +70,8 @@ class AfterBlockDomainFromAccountService < BaseService
def domain_block_event
@domain_block_event ||= RelationshipSeveranceEvent.create!(type: :user_domain_block, target_name: @domain)
end
def notify_streaming!
redis.publish('system', Oj.dump(event: :domain_blocks_changed, account: @account.id, target_domain: @domain))
end
end

View file

@ -1,6 +1,8 @@
# frozen_string_literal: true
class AfterBlockService < BaseService
include Redisable
def call(account, target_account)
@account = account
@target_account = target_account
@ -9,6 +11,8 @@ class AfterBlockService < BaseService
clear_list_feeds!
clear_notifications!
clear_conversations!
notify_streaming!
end
private
@ -28,4 +32,8 @@ class AfterBlockService < BaseService
def clear_notifications!
Notification.where(account: @account).where(from_account: @target_account).in_batches.delete_all
end
def notify_streaming!
redis.publish('system', Oj.dump(event: :blocks_changed, account: @account.id, target_account: @target_account.id))
end
end

View file

@ -0,0 +1,12 @@
# frozen_string_literal: true
class AfterUnblockDomainFromAccountService < BaseService
include Redisable
# This service does not delete an AccountDomainBlock record,
# it's meant to be called after such a record has been created
# synchronously, to "clean up"
def call(account, domain)
redis.publish('system', Oj.dump(event: :domain_blocks_changed, account: account.id, target_domain: domain))
end
end

View file

@ -2,12 +2,16 @@
class UnblockService < BaseService
include Payloadable
include Redisable
def call(account, target_account)
return unless account.blocking?(target_account)
unblock = account.unblock!(target_account)
create_notification(unblock) if !target_account.local? && target_account.activitypub?
redis.publish('system', Oj.dump(event: :blocks_changed, account: account.id, target_account: target_account.id))
unblock
end

View file

@ -1,11 +1,15 @@
# frozen_string_literal: true
class UnmuteService < BaseService
include Redisable
def call(account, target_account)
return unless account.muting?(target_account)
account.unmute!(target_account)
MergeWorker.perform_async(target_account.id, account.id) if account.following?(target_account)
redis.publish('system', Oj.dump(event: :mutes_changed, account: account.id, target_account: target_account.id))
end
end

View file

@ -2,10 +2,25 @@
class MuteWorker
include Sidekiq::Worker
include Redisable
def perform(account_id, target_account_id)
FeedManager.instance.clear_from_home(Account.find(account_id), Account.find(target_account_id))
@account = Account.find(account_id)
@target_account = Account.find(target_account_id)
clear_home_feed!
notify_streaming!
rescue ActiveRecord::RecordNotFound
true
end
private
def clear_home_feed!
FeedManager.instance.clear_from_home(@account, @target_account)
end
def notify_streaming!
redis.publish('system', Oj.dump(event: :mutes_changed, account: @account.id, target_account: @target_account.id))
end
end

View file

@ -48,13 +48,16 @@ describe Oauth::AuthorizedApplicationsController do
describe 'DELETE #destroy' do
let!(:user) { Fabricate(:user) }
let!(:application) { Fabricate(:application) }
let!(:access_token) { Fabricate(:accessible_access_token, application: application, resource_owner_id: user.id) }
let!(:web_push_subscription) { Fabricate(:web_push_subscription, user: user, access_token: access_token) }
let!(:access_tokens) { Fabricate.times(3, :accessible_access_token, application: application, resource_owner_id: user.id) }
let!(:web_push_subscription) { Fabricate(:web_push_subscription, user: user, access_token: access_tokens[0]) }
let(:redis_pipeline_stub) { instance_double(Redis::Namespace, publish: nil) }
before do
sign_in user, scope: :user
allow(redis).to receive(:pipelined).and_yield(redis_pipeline_stub)
allow(redis).to receive(:publish)
post :destroy, params: { id: application.id }
end
@ -71,7 +74,11 @@ describe Oauth::AuthorizedApplicationsController do
end
it 'sends a session kill payload to the streaming server' do
expect(redis_pipeline_stub).to have_received(:publish).with("timeline:access_token:#{access_token.id}", '{"event":"kill"}')
access_tokens.each do |access_token|
expect(redis_pipeline_stub).to have_received(:publish).with("timeline:access_token:#{access_token.id}", '{"event":"kill"}')
end
expect(redis).to have_received(:publish).with('system', Oj.dump(event: :terminate, access_tokens: access_tokens.map(&:id)))
end
end
end

View file

@ -148,10 +148,12 @@ describe Settings::ApplicationsController do
describe 'destroy' do
let(:redis_pipeline_stub) { instance_double(Redis::Namespace, publish: nil) }
let!(:access_token) { Fabricate(:accessible_access_token, application: app) }
let!(:access_tokens) { Fabricate.times(3, :accessible_access_token, application: app) }
before do
allow(redis).to receive(:pipelined).and_yield(redis_pipeline_stub)
allow(redis).to receive(:publish)
post :destroy, params: { id: app.id }
end
@ -161,7 +163,11 @@ describe Settings::ApplicationsController do
end
it 'sends a session kill payload to the streaming server' do
expect(redis_pipeline_stub).to have_received(:publish).with("timeline:access_token:#{access_token.id}", '{"event":"kill"}')
access_tokens.each do |access_token|
expect(redis_pipeline_stub).to have_received(:publish).with("timeline:access_token:#{access_token.id}", '{"event":"kill"}')
end
expect(redis).to have_received(:publish).with('system', Oj.dump({ event: :terminate, access_tokens: access_tokens.map(&:id) }))
end
end

View file

@ -466,13 +466,14 @@ RSpec.describe User do
subject(:user) { Fabricate(:user, password: 'foobar12345') }
let!(:session_activation) { Fabricate(:session_activation, user: user) }
let!(:access_token) { Fabricate(:access_token, resource_owner_id: user.id) }
let!(:web_push_subscription) { Fabricate(:web_push_subscription, access_token: access_token) }
let!(:access_tokens) { Fabricate.times(3, :access_token, resource_owner_id: user.id) }
let!(:web_push_subscription) { Fabricate(:web_push_subscription, access_token: access_tokens[0]) }
let(:redis_pipeline_stub) { instance_double(Redis::Namespace, publish: nil) }
before do
allow(redis).to receive(:pipelined).and_yield(redis_pipeline_stub)
allow(redis).to receive(:publish)
user.reset_password!
end
@ -490,11 +491,15 @@ RSpec.describe User do
end
it 'revokes streaming access for all access tokens' do
expect(redis_pipeline_stub).to have_received(:publish).with("timeline:access_token:#{access_token.id}", Oj.dump(event: :kill)).once
access_tokens.each do |access_token|
expect(redis_pipeline_stub).to have_received(:publish).with("timeline:access_token:#{access_token.id}", Oj.dump(event: :kill)).once
end
expect(redis).to have_received(:publish).with('system', Oj.dump(event: :terminate, access_tokens: access_tokens.map(&:id))).once
end
it 'removes push subscriptions' do
expect(Web::PushSubscription.where(user: user).or(Web::PushSubscription.where(access_token: access_token)).count).to eq 0
expect(Web::PushSubscription.where(user: user).or(Web::PushSubscription.where(access_token: access_tokens[0])).count).to eq 0
expect { web_push_subscription.reload }.to raise_error(ActiveRecord::RecordNotFound)
end
end

View file

@ -199,6 +199,7 @@ RSpec.describe 'Filters' do
subject
expect(redis).to have_received(:publish).with("timeline:#{user.account.id}", Oj.dump(event: :filters_changed)).once
expect(redis).to have_received(:publish).with('system', Oj.dump(event: :filters_changed, account_id: user.account.id)).once
end
end

View file

@ -5,8 +5,9 @@ require 'rails_helper'
RSpec.describe AfterBlockDomainFromAccountService do
subject { described_class.new }
let(:wolf) { Fabricate(:account, username: 'wolf', domain: 'evil.org', inbox_url: 'https://evil.org/wolf/inbox', protocol: :activitypub) }
let(:dog) { Fabricate(:account, username: 'dog', domain: 'evil.org', inbox_url: 'https://evil.org/dog/inbox', protocol: :activitypub) }
let(:target_domain) { 'evil.org' }
let(:wolf) { Fabricate(:account, username: 'wolf', domain: target_domain, inbox_url: 'https://evil.org/wolf/inbox', protocol: :activitypub) }
let(:dog) { Fabricate(:account, username: 'dog', domain: target_domain, inbox_url: 'https://evil.org/dog/inbox', protocol: :activitypub) }
let(:alice) { Fabricate(:account, username: 'alice') }
before do
@ -17,7 +18,7 @@ RSpec.describe AfterBlockDomainFromAccountService do
end
it 'purge followers from blocked domain, remove notification permissions, sends `Reject->Follow`, and records severed relationships', :aggregate_failures do
expect { subject.call(alice, 'evil.org') }
expect { subject.call(alice, target_domain) }
.to change { wolf.following?(alice) }.from(true).to(false)
.and change { NotificationPermission.exists?(account: alice, from_account: wolf) }.from(true).to(false)
@ -31,4 +32,16 @@ RSpec.describe AfterBlockDomainFromAccountService do
expect(severed_relationships[0].relationship_severance_event).to eq severed_relationships[1].relationship_severance_event
expect(severed_relationships.map { |rel| [rel.account, rel.target_account] }).to contain_exactly([wolf, alice], [alice, dog])
end
describe 'streaming integration' do
before do
allow(redis).to receive(:publish)
end
it 'notifies streaming of the domain blocks change' do
subject.call(alice, target_domain)
expect(redis).to have_received(:publish).with('system', Oj.dump(event: :domain_blocks_changed, account: alice.id, target_domain: target_domain)).once
end
end
end

View file

@ -48,4 +48,16 @@ RSpec.describe AfterBlockService do
}.from([status.id.to_s, other_account_status.id.to_s, other_account_reblog.id.to_s]).to([other_account_status.id.to_s])
end
end
describe 'streaming integration' do
before do
allow(redis).to receive(:publish)
end
it 'notifies streaming of the blocks change' do
subject
expect(redis).to have_received(:publish).with('system', Oj.dump(event: :blocks_changed, account: account.id, target_account: target_account.id)).once
end
end
end

View file

@ -7,6 +7,10 @@ RSpec.describe UnblockService do
let(:sender) { Fabricate(:account, username: 'alice') }
before do
allow(redis).to receive(:publish)
end
describe 'local' do
let(:bob) { Fabricate(:account) }
@ -18,6 +22,10 @@ RSpec.describe UnblockService do
it 'destroys the blocking relation' do
expect(sender.blocking?(bob)).to be false
end
it 'notifies streaming of the change in blocks' do
expect(redis).to have_received(:publish).with('system', Oj.dump(event: :blocks_changed, account: sender.id, target_account: bob.id))
end
end
describe 'remote ActivityPub' do
@ -36,5 +44,9 @@ RSpec.describe UnblockService do
it 'sends an unblock activity', :inline_jobs do
expect(a_request(:post, 'http://example.com/inbox')).to have_been_made.once
end
it 'notifies streaming of the change in blocks' do
expect(redis).to have_received(:publish).with('system', Oj.dump(event: :blocks_changed, account: sender.id, target_account: bob.id))
end
end
end

View file

@ -7,6 +7,10 @@ RSpec.describe UnmuteService do
let!(:account) { Fabricate(:account) }
let!(:target_account) { Fabricate(:account) }
before do
allow(redis).to receive(:publish)
end
context 'when account is muting target account' do
before { Fabricate :mute, account: account, target_account: target_account }
@ -33,6 +37,12 @@ RSpec.describe UnmuteService do
.from(true)
.to(false)
end
it 'notifies streaming of the change in mutes' do
subject.call(account, target_account)
expect(redis).to have_received(:publish).with('system', Oj.dump(event: :mutes_changed, account: account.id, target_account: target_account.id))
end
end
context 'when account is not muting target account' do
@ -40,6 +50,8 @@ RSpec.describe UnmuteService do
expect { subject.call(account, target_account) }
.to_not(change { account.reload.muting?(target_account) })
expect(MergeWorker).to_not have_enqueued_sidekiq_job(any_args)
expect(redis).to_not have_received(:publish)
end
end
end

View file

@ -0,0 +1,36 @@
# frozen_string_literal: true
require 'rails_helper'
describe MuteWorker do
subject { described_class.new.perform(account.id, target_account.id) }
let(:account) { Fabricate(:account) }
let(:target_account) { Fabricate(:account) }
describe '#perform' do
describe 'home timeline' do
before do
allow(FeedManager.instance).to receive(:clear_from_home)
end
it "clears target account's statuses" do
subject
expect(FeedManager.instance).to have_received(:clear_from_home).with(account, target_account)
end
end
describe 'streaming integration' do
before do
allow(redis).to receive(:publish)
end
it 'notifies streaming of the change in mutes' do
subject
expect(redis).to have_received(:publish).with('system', Oj.dump(event: :mutes_changed, account: account.id, target_account: target_account.id))
end
end
end
end