mirror of
https://github.com/mastodon/mastodon.git
synced 2024-08-20 21:08:15 -07:00
Compare commits
5 commits
ba5b82cfe5
...
f072787c5a
Author | SHA1 | Date | |
---|---|---|---|
|
f072787c5a | ||
|
afc74a89b2 | ||
|
25f9248ecd | ||
|
b12b27663a | ||
|
209376306b |
20 changed files with 187 additions and 18 deletions
|
@ -24,6 +24,9 @@ module AccessTokenExtension
|
|||
end
|
||||
|
||||
def push_to_streaming_api
|
||||
redis.publish("timeline:access_token:#{id}", Oj.dump(event: :kill)) if revoked? || destroyed?
|
||||
if revoked? || destroyed?
|
||||
redis.publish("timeline:access_token:#{id}", Oj.dump(event: :kill))
|
||||
redis.publish('system', Oj.dump(event: :terminate, access_tokens: [id]))
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -40,6 +40,8 @@ module ApplicationExtension
|
|||
pipeline.publish("timeline:access_token:#{id}", payload)
|
||||
end
|
||||
end
|
||||
|
||||
redis.publish('system', Oj.dump(event: :terminate, access_tokens: tokens.ids))
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -18,6 +18,8 @@ class AccountDomainBlock < ApplicationRecord
|
|||
belongs_to :account
|
||||
validates :domain, presence: true, uniqueness: { scope: :account_id }, domain: true
|
||||
|
||||
after_destroy :notify_streaming
|
||||
|
||||
after_commit :invalidate_domain_blocking_cache
|
||||
after_commit :invalidate_follow_recommendations_cache
|
||||
|
||||
|
@ -31,4 +33,8 @@ class AccountDomainBlock < ApplicationRecord
|
|||
def invalidate_follow_recommendations_cache
|
||||
Rails.cache.delete("follow_recommendations/#{account_id}")
|
||||
end
|
||||
|
||||
def notify_streaming
|
||||
AfterUnblockDomainFromAccountService.call(account_id, domain)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -114,6 +114,8 @@ class CustomFilter < ApplicationRecord
|
|||
Rails.cache.delete("filters:v3:#{account_id}")
|
||||
redis.publish("timeline:#{account_id}", Oj.dump(event: :filters_changed))
|
||||
redis.publish("timeline:system:#{account_id}", Oj.dump(event: :filters_changed))
|
||||
|
||||
redis.publish('system', Oj.dump(event: :filters_changed, account_id: account_id))
|
||||
end
|
||||
|
||||
private
|
||||
|
|
|
@ -342,19 +342,21 @@ class User < ApplicationRecord
|
|||
def revoke_access!
|
||||
Doorkeeper::AccessGrant.by_resource_owner(self).update_all(revoked_at: Time.now.utc)
|
||||
|
||||
Doorkeeper::AccessToken.by_resource_owner(self).in_batches do |batch|
|
||||
batch.update_all(revoked_at: Time.now.utc)
|
||||
Web::PushSubscription.where(access_token_id: batch).delete_all
|
||||
Doorkeeper::AccessToken.by_resource_owner(self).in_batches do |tokens|
|
||||
tokens.update_all(revoked_at: Time.now.utc)
|
||||
Web::PushSubscription.where(access_token_id: tokens).delete_all
|
||||
|
||||
# Revoke each access token for the Streaming API, since `update_all``
|
||||
# doesn't trigger ActiveRecord Callbacks:
|
||||
# TODO: #28793 Combine into a single topic
|
||||
payload = Oj.dump(event: :kill)
|
||||
redis.pipelined do |pipeline|
|
||||
batch.ids.each do |id|
|
||||
tokens.ids.each do |id|
|
||||
pipeline.publish("timeline:access_token:#{id}", payload)
|
||||
end
|
||||
end
|
||||
|
||||
redis.publish('system', Oj.dump(event: :terminate, access_tokens: tokens.ids))
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
|
||||
class AfterBlockDomainFromAccountService < BaseService
|
||||
include Payloadable
|
||||
include Redisable
|
||||
|
||||
# This service does not create an AccountDomainBlock record,
|
||||
# it's meant to be called after such a record has been created
|
||||
|
@ -16,7 +17,9 @@ class AfterBlockDomainFromAccountService < BaseService
|
|||
remove_follows!
|
||||
reject_existing_followers!
|
||||
reject_pending_follow_requests!
|
||||
|
||||
notify_of_severed_relationships!
|
||||
notify_streaming!
|
||||
end
|
||||
|
||||
private
|
||||
|
@ -67,4 +70,8 @@ class AfterBlockDomainFromAccountService < BaseService
|
|||
def domain_block_event
|
||||
@domain_block_event ||= RelationshipSeveranceEvent.create!(type: :user_domain_block, target_name: @domain)
|
||||
end
|
||||
|
||||
def notify_streaming!
|
||||
redis.publish('system', Oj.dump(event: :domain_blocks_changed, account: @account.id, target_domain: @domain))
|
||||
end
|
||||
end
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
class AfterBlockService < BaseService
|
||||
include Redisable
|
||||
|
||||
def call(account, target_account)
|
||||
@account = account
|
||||
@target_account = target_account
|
||||
|
@ -9,6 +11,8 @@ class AfterBlockService < BaseService
|
|||
clear_list_feeds!
|
||||
clear_notifications!
|
||||
clear_conversations!
|
||||
|
||||
notify_streaming!
|
||||
end
|
||||
|
||||
private
|
||||
|
@ -28,4 +32,8 @@ class AfterBlockService < BaseService
|
|||
def clear_notifications!
|
||||
Notification.where(account: @account).where(from_account: @target_account).in_batches.delete_all
|
||||
end
|
||||
|
||||
def notify_streaming!
|
||||
redis.publish('system', Oj.dump(event: :blocks_changed, account: @account.id, target_account: @target_account.id))
|
||||
end
|
||||
end
|
||||
|
|
12
app/services/after_unblock_domain_from_account_service.rb
Normal file
12
app/services/after_unblock_domain_from_account_service.rb
Normal file
|
@ -0,0 +1,12 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
class AfterUnblockDomainFromAccountService < BaseService
|
||||
include Redisable
|
||||
|
||||
# This service does not delete an AccountDomainBlock record,
|
||||
# it's meant to be called after such a record has been created
|
||||
# synchronously, to "clean up"
|
||||
def call(account, domain)
|
||||
redis.publish('system', Oj.dump(event: :domain_blocks_changed, account: account.id, target_domain: domain))
|
||||
end
|
||||
end
|
|
@ -2,12 +2,16 @@
|
|||
|
||||
class UnblockService < BaseService
|
||||
include Payloadable
|
||||
include Redisable
|
||||
|
||||
def call(account, target_account)
|
||||
return unless account.blocking?(target_account)
|
||||
|
||||
unblock = account.unblock!(target_account)
|
||||
create_notification(unblock) if !target_account.local? && target_account.activitypub?
|
||||
|
||||
redis.publish('system', Oj.dump(event: :blocks_changed, account: account.id, target_account: target_account.id))
|
||||
|
||||
unblock
|
||||
end
|
||||
|
||||
|
|
|
@ -1,11 +1,15 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
class UnmuteService < BaseService
|
||||
include Redisable
|
||||
|
||||
def call(account, target_account)
|
||||
return unless account.muting?(target_account)
|
||||
|
||||
account.unmute!(target_account)
|
||||
|
||||
MergeWorker.perform_async(target_account.id, account.id) if account.following?(target_account)
|
||||
|
||||
redis.publish('system', Oj.dump(event: :mutes_changed, account: account.id, target_account: target_account.id))
|
||||
end
|
||||
end
|
||||
|
|
|
@ -2,10 +2,25 @@
|
|||
|
||||
class MuteWorker
|
||||
include Sidekiq::Worker
|
||||
include Redisable
|
||||
|
||||
def perform(account_id, target_account_id)
|
||||
FeedManager.instance.clear_from_home(Account.find(account_id), Account.find(target_account_id))
|
||||
@account = Account.find(account_id)
|
||||
@target_account = Account.find(target_account_id)
|
||||
|
||||
clear_home_feed!
|
||||
notify_streaming!
|
||||
rescue ActiveRecord::RecordNotFound
|
||||
true
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def clear_home_feed!
|
||||
FeedManager.instance.clear_from_home(@account, @target_account)
|
||||
end
|
||||
|
||||
def notify_streaming!
|
||||
redis.publish('system', Oj.dump(event: :mutes_changed, account: @account.id, target_account: @target_account.id))
|
||||
end
|
||||
end
|
||||
|
|
|
@ -48,13 +48,16 @@ describe Oauth::AuthorizedApplicationsController do
|
|||
describe 'DELETE #destroy' do
|
||||
let!(:user) { Fabricate(:user) }
|
||||
let!(:application) { Fabricate(:application) }
|
||||
let!(:access_token) { Fabricate(:accessible_access_token, application: application, resource_owner_id: user.id) }
|
||||
let!(:web_push_subscription) { Fabricate(:web_push_subscription, user: user, access_token: access_token) }
|
||||
let!(:access_tokens) { Fabricate.times(3, :accessible_access_token, application: application, resource_owner_id: user.id) }
|
||||
let!(:web_push_subscription) { Fabricate(:web_push_subscription, user: user, access_token: access_tokens[0]) }
|
||||
let(:redis_pipeline_stub) { instance_double(Redis::Namespace, publish: nil) }
|
||||
|
||||
before do
|
||||
sign_in user, scope: :user
|
||||
|
||||
allow(redis).to receive(:pipelined).and_yield(redis_pipeline_stub)
|
||||
allow(redis).to receive(:publish)
|
||||
|
||||
post :destroy, params: { id: application.id }
|
||||
end
|
||||
|
||||
|
@ -71,7 +74,11 @@ describe Oauth::AuthorizedApplicationsController do
|
|||
end
|
||||
|
||||
it 'sends a session kill payload to the streaming server' do
|
||||
access_tokens.each do |access_token|
|
||||
expect(redis_pipeline_stub).to have_received(:publish).with("timeline:access_token:#{access_token.id}", '{"event":"kill"}')
|
||||
end
|
||||
|
||||
expect(redis).to have_received(:publish).with('system', Oj.dump(event: :terminate, access_tokens: access_tokens.map(&:id)))
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -148,10 +148,12 @@ describe Settings::ApplicationsController do
|
|||
|
||||
describe 'destroy' do
|
||||
let(:redis_pipeline_stub) { instance_double(Redis::Namespace, publish: nil) }
|
||||
let!(:access_token) { Fabricate(:accessible_access_token, application: app) }
|
||||
let!(:access_tokens) { Fabricate.times(3, :accessible_access_token, application: app) }
|
||||
|
||||
before do
|
||||
allow(redis).to receive(:pipelined).and_yield(redis_pipeline_stub)
|
||||
allow(redis).to receive(:publish)
|
||||
|
||||
post :destroy, params: { id: app.id }
|
||||
end
|
||||
|
||||
|
@ -161,8 +163,12 @@ describe Settings::ApplicationsController do
|
|||
end
|
||||
|
||||
it 'sends a session kill payload to the streaming server' do
|
||||
access_tokens.each do |access_token|
|
||||
expect(redis_pipeline_stub).to have_received(:publish).with("timeline:access_token:#{access_token.id}", '{"event":"kill"}')
|
||||
end
|
||||
|
||||
expect(redis).to have_received(:publish).with('system', Oj.dump({ event: :terminate, access_tokens: access_tokens.map(&:id) }))
|
||||
end
|
||||
end
|
||||
|
||||
describe 'regenerate' do
|
||||
|
|
|
@ -466,13 +466,14 @@ RSpec.describe User do
|
|||
subject(:user) { Fabricate(:user, password: 'foobar12345') }
|
||||
|
||||
let!(:session_activation) { Fabricate(:session_activation, user: user) }
|
||||
let!(:access_token) { Fabricate(:access_token, resource_owner_id: user.id) }
|
||||
let!(:web_push_subscription) { Fabricate(:web_push_subscription, access_token: access_token) }
|
||||
let!(:access_tokens) { Fabricate.times(3, :access_token, resource_owner_id: user.id) }
|
||||
let!(:web_push_subscription) { Fabricate(:web_push_subscription, access_token: access_tokens[0]) }
|
||||
|
||||
let(:redis_pipeline_stub) { instance_double(Redis::Namespace, publish: nil) }
|
||||
|
||||
before do
|
||||
allow(redis).to receive(:pipelined).and_yield(redis_pipeline_stub)
|
||||
allow(redis).to receive(:publish)
|
||||
user.reset_password!
|
||||
end
|
||||
|
||||
|
@ -490,11 +491,15 @@ RSpec.describe User do
|
|||
end
|
||||
|
||||
it 'revokes streaming access for all access tokens' do
|
||||
access_tokens.each do |access_token|
|
||||
expect(redis_pipeline_stub).to have_received(:publish).with("timeline:access_token:#{access_token.id}", Oj.dump(event: :kill)).once
|
||||
end
|
||||
|
||||
expect(redis).to have_received(:publish).with('system', Oj.dump(event: :terminate, access_tokens: access_tokens.map(&:id))).once
|
||||
end
|
||||
|
||||
it 'removes push subscriptions' do
|
||||
expect(Web::PushSubscription.where(user: user).or(Web::PushSubscription.where(access_token: access_token)).count).to eq 0
|
||||
expect(Web::PushSubscription.where(user: user).or(Web::PushSubscription.where(access_token: access_tokens[0])).count).to eq 0
|
||||
expect { web_push_subscription.reload }.to raise_error(ActiveRecord::RecordNotFound)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -199,6 +199,7 @@ RSpec.describe 'Filters' do
|
|||
subject
|
||||
|
||||
expect(redis).to have_received(:publish).with("timeline:#{user.account.id}", Oj.dump(event: :filters_changed)).once
|
||||
expect(redis).to have_received(:publish).with('system', Oj.dump(event: :filters_changed, account_id: user.account.id)).once
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -5,8 +5,9 @@ require 'rails_helper'
|
|||
RSpec.describe AfterBlockDomainFromAccountService do
|
||||
subject { described_class.new }
|
||||
|
||||
let(:wolf) { Fabricate(:account, username: 'wolf', domain: 'evil.org', inbox_url: 'https://evil.org/wolf/inbox', protocol: :activitypub) }
|
||||
let(:dog) { Fabricate(:account, username: 'dog', domain: 'evil.org', inbox_url: 'https://evil.org/dog/inbox', protocol: :activitypub) }
|
||||
let(:target_domain) { 'evil.org' }
|
||||
let(:wolf) { Fabricate(:account, username: 'wolf', domain: target_domain, inbox_url: 'https://evil.org/wolf/inbox', protocol: :activitypub) }
|
||||
let(:dog) { Fabricate(:account, username: 'dog', domain: target_domain, inbox_url: 'https://evil.org/dog/inbox', protocol: :activitypub) }
|
||||
let(:alice) { Fabricate(:account, username: 'alice') }
|
||||
|
||||
before do
|
||||
|
@ -17,7 +18,7 @@ RSpec.describe AfterBlockDomainFromAccountService do
|
|||
end
|
||||
|
||||
it 'purge followers from blocked domain, remove notification permissions, sends `Reject->Follow`, and records severed relationships', :aggregate_failures do
|
||||
expect { subject.call(alice, 'evil.org') }
|
||||
expect { subject.call(alice, target_domain) }
|
||||
.to change { wolf.following?(alice) }.from(true).to(false)
|
||||
.and change { NotificationPermission.exists?(account: alice, from_account: wolf) }.from(true).to(false)
|
||||
|
||||
|
@ -31,4 +32,16 @@ RSpec.describe AfterBlockDomainFromAccountService do
|
|||
expect(severed_relationships[0].relationship_severance_event).to eq severed_relationships[1].relationship_severance_event
|
||||
expect(severed_relationships.map { |rel| [rel.account, rel.target_account] }).to contain_exactly([wolf, alice], [alice, dog])
|
||||
end
|
||||
|
||||
describe 'streaming integration' do
|
||||
before do
|
||||
allow(redis).to receive(:publish)
|
||||
end
|
||||
|
||||
it 'notifies streaming of the domain blocks change' do
|
||||
subject.call(alice, target_domain)
|
||||
|
||||
expect(redis).to have_received(:publish).with('system', Oj.dump(event: :domain_blocks_changed, account: alice.id, target_domain: target_domain)).once
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -48,4 +48,16 @@ RSpec.describe AfterBlockService do
|
|||
}.from([status.id.to_s, other_account_status.id.to_s, other_account_reblog.id.to_s]).to([other_account_status.id.to_s])
|
||||
end
|
||||
end
|
||||
|
||||
describe 'streaming integration' do
|
||||
before do
|
||||
allow(redis).to receive(:publish)
|
||||
end
|
||||
|
||||
it 'notifies streaming of the blocks change' do
|
||||
subject
|
||||
|
||||
expect(redis).to have_received(:publish).with('system', Oj.dump(event: :blocks_changed, account: account.id, target_account: target_account.id)).once
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -7,6 +7,10 @@ RSpec.describe UnblockService do
|
|||
|
||||
let(:sender) { Fabricate(:account, username: 'alice') }
|
||||
|
||||
before do
|
||||
allow(redis).to receive(:publish)
|
||||
end
|
||||
|
||||
describe 'local' do
|
||||
let(:bob) { Fabricate(:account) }
|
||||
|
||||
|
@ -18,6 +22,10 @@ RSpec.describe UnblockService do
|
|||
it 'destroys the blocking relation' do
|
||||
expect(sender.blocking?(bob)).to be false
|
||||
end
|
||||
|
||||
it 'notifies streaming of the change in blocks' do
|
||||
expect(redis).to have_received(:publish).with('system', Oj.dump(event: :blocks_changed, account: sender.id, target_account: bob.id))
|
||||
end
|
||||
end
|
||||
|
||||
describe 'remote ActivityPub' do
|
||||
|
@ -36,5 +44,9 @@ RSpec.describe UnblockService do
|
|||
it 'sends an unblock activity', :inline_jobs do
|
||||
expect(a_request(:post, 'http://example.com/inbox')).to have_been_made.once
|
||||
end
|
||||
|
||||
it 'notifies streaming of the change in blocks' do
|
||||
expect(redis).to have_received(:publish).with('system', Oj.dump(event: :blocks_changed, account: sender.id, target_account: bob.id))
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -7,6 +7,10 @@ RSpec.describe UnmuteService do
|
|||
let!(:account) { Fabricate(:account) }
|
||||
let!(:target_account) { Fabricate(:account) }
|
||||
|
||||
before do
|
||||
allow(redis).to receive(:publish)
|
||||
end
|
||||
|
||||
context 'when account is muting target account' do
|
||||
before { Fabricate :mute, account: account, target_account: target_account }
|
||||
|
||||
|
@ -33,6 +37,12 @@ RSpec.describe UnmuteService do
|
|||
.from(true)
|
||||
.to(false)
|
||||
end
|
||||
|
||||
it 'notifies streaming of the change in mutes' do
|
||||
subject.call(account, target_account)
|
||||
|
||||
expect(redis).to have_received(:publish).with('system', Oj.dump(event: :mutes_changed, account: account.id, target_account: target_account.id))
|
||||
end
|
||||
end
|
||||
|
||||
context 'when account is not muting target account' do
|
||||
|
@ -40,6 +50,8 @@ RSpec.describe UnmuteService do
|
|||
expect { subject.call(account, target_account) }
|
||||
.to_not(change { account.reload.muting?(target_account) })
|
||||
expect(MergeWorker).to_not have_enqueued_sidekiq_job(any_args)
|
||||
|
||||
expect(redis).to_not have_received(:publish)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
36
spec/workers/mute_worker_spec.rb
Normal file
36
spec/workers/mute_worker_spec.rb
Normal file
|
@ -0,0 +1,36 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
require 'rails_helper'
|
||||
|
||||
describe MuteWorker do
|
||||
subject { described_class.new.perform(account.id, target_account.id) }
|
||||
|
||||
let(:account) { Fabricate(:account) }
|
||||
let(:target_account) { Fabricate(:account) }
|
||||
|
||||
describe '#perform' do
|
||||
describe 'home timeline' do
|
||||
before do
|
||||
allow(FeedManager.instance).to receive(:clear_from_home)
|
||||
end
|
||||
|
||||
it "clears target account's statuses" do
|
||||
subject
|
||||
|
||||
expect(FeedManager.instance).to have_received(:clear_from_home).with(account, target_account)
|
||||
end
|
||||
end
|
||||
|
||||
describe 'streaming integration' do
|
||||
before do
|
||||
allow(redis).to receive(:publish)
|
||||
end
|
||||
|
||||
it 'notifies streaming of the change in mutes' do
|
||||
subject
|
||||
|
||||
expect(redis).to have_received(:publish).with('system', Oj.dump(event: :mutes_changed, account: account.id, target_account: target_account.id))
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
Loading…
Reference in a new issue