From 4bd2728d7e9e70f6a5c481038d17cd4ba04bd1fc Mon Sep 17 00:00:00 2001 From: Emelia Smith Date: Mon, 8 Jul 2024 23:57:08 +0200 Subject: [PATCH] Streaming: fetch keyword filters on connection --- streaming/index.js | 172 ++++++++++++++++++++++++++++----------------- 1 file changed, 106 insertions(+), 66 deletions(-) diff --git a/streaming/index.js b/streaming/index.js index dd9ea0c7f68..556f4d9a081 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -35,6 +35,29 @@ dotenv.config({ initializeLogLevel(process.env, environment); + +/** + * @typedef Filter + * @property {number} id + * @property {string} title + * @property {string[]} context + * @property {Date} expires_at + * @property {'warn' | 'hide'} filter_action + */ + +/** + * @typedef KeywordFilter + * @property {Array<[string, boolean]>} keywords + * @property {Date} expires_at + * @property {Filter} filter + * @property {RegExp} regexp + */ + +/** + * @typedef Filters + * @type {Object} + */ + /** * Declares the result type for accountFromToken / accountFromRequest. * @@ -46,6 +69,7 @@ initializeLogLevel(process.env, environment); * @property {string} accountId * @property {string[]} chosenLanguages * @property {string} deviceId + * @property {Filters} cachedFilters */ /** @@ -537,12 +561,15 @@ const startServer = async () => { req.chosenLanguages = result.rows[0].chosen_languages; req.deviceId = result.rows[0].device_id; + req.cachedFilters = await fetchFilters(req.accountId); + return { accessTokenId: result.rows[0].id, scopes: result.rows[0].scopes.split(' '), accountId: result.rows[0].account_id, chosenLanguages: result.rows[0].chosen_languages, - deviceId: result.rows[0].device_id + deviceId: result.rows[0].device_id, + cachedFilters: req.cachedFilters }; }; @@ -636,6 +663,76 @@ const startServer = async () => { reject(new AuthenticationError('Access token does not have the required scopes')); }); + + const FILTER_ACTIONS = [ + 'warn', + 'hide' + ]; + + /** + * + * @param {string} accountId + * @returns {Promise} + */ + const fetchFilters = async (accountId) => { + const results = await pgPool.query('SELECT filter.id AS id, filter.phrase AS title, filter.context AS context, filter.expires_at AS expires_at, filter.action AS filter_action, keyword.keyword AS keyword, keyword.whole_word AS whole_word FROM custom_filter_keywords keyword JOIN custom_filters filter ON keyword.custom_filter_id = filter.id WHERE filter.account_id = $1 AND (filter.expires_at IS NULL OR filter.expires_at > NOW())', [accountId]); + + if (results.rows.length === 0) { + return {}; + } + + const filters = results.rows.reduce((filters, filter) => { + if (filters[filter.id]) { + filters[filter.id].keywords.push([filter.keyword, filter.whole_word]); + } else { + filters[filter.id] = { + keywords: [[filter.keyword, filter.whole_word]], + expires_at: filter.expires_at, + filter: { + id: filter.id, + title: filter.title, + context: filter.context, + expires_at: filter.expires_at, + // filter.filter_action is the value from the + // custom_filters.action database column, it is an integer + // representing a value in an enum defined by Ruby on Rails: + // + // enum { warn: 0, hide: 1 } + filter_action: FILTER_ACTIONS[filter.filter_action], + }, + }; + } + + return filters; + }, {}); + + // Construct the regular expressions for the custom filters: This + // needs to be done in a separate loop as the database returns one + // filterRow per keyword, so we need all the keywords before + // constructing the regular expression + // @ts-ignore + Object.keys(filters).forEach((id) => { + // @ts-ignore + filters[id].regexp = new RegExp(filters[id].keywords.map(([keyword, whole_word]) => { + let expr = keyword.replace(/[.*+?^${}()|[\]\\]/g, '\\$&'); + + if (whole_word) { + if (/^[\w]/.test(expr)) { + expr = `\\b${expr}`; + } + + if (/[\w]$/.test(expr)) { + expr = `${expr}\\b`; + } + } + + return expr; + }).join('|'), 'i'); + }); + + return filters; + }; + /** * @typedef SystemMessageHandlers * @property {function(): void} onKill @@ -661,7 +758,12 @@ const startServer = async () => { eventHandlers.onKill(); } else if (event === 'filters_changed') { req.log.debug(`Invalidating filters cache for ${req.accountId}`); - req.cachedFilters = null; + + fetchFilters(req.accountId).then((filters) => { + req.cachedFilters = filters; + }).catch((err) => { + req.log.error({ err }, 'Error refreshing filters'); + }); } }; }; @@ -865,12 +967,6 @@ const startServer = async () => { queries.push(client.query('SELECT 1 FROM account_domain_blocks WHERE account_id = $1 AND domain = $2', [req.accountId, accountDomain])); } - // @ts-ignore - if (!payload.filtered && !req.cachedFilters) { - // @ts-ignore - queries.push(client.query('SELECT filter.id AS id, filter.phrase AS title, filter.context AS context, filter.expires_at AS expires_at, filter.action AS filter_action, keyword.keyword AS keyword, keyword.whole_word AS whole_word FROM custom_filter_keywords keyword JOIN custom_filters filter ON keyword.custom_filter_id = filter.id WHERE filter.account_id = $1 AND (filter.expires_at IS NULL OR filter.expires_at > NOW())', [req.accountId])); - } - Promise.all(queries).then(values => { releasePgConnection(); @@ -888,67 +984,10 @@ const startServer = async () => { return; } - // Handling for constructing the custom filters and caching them on the request - // TODO: Move this logic out of the message handling lifecycle - // @ts-ignore - if (!req.cachedFilters) { - const filterRows = values[accountDomain ? 2 : 1].rows; - - // @ts-ignore - req.cachedFilters = filterRows.reduce((cache, filter) => { - if (cache[filter.id]) { - cache[filter.id].keywords.push([filter.keyword, filter.whole_word]); - } else { - cache[filter.id] = { - keywords: [[filter.keyword, filter.whole_word]], - expires_at: filter.expires_at, - filter: { - id: filter.id, - title: filter.title, - context: filter.context, - expires_at: filter.expires_at, - // filter.filter_action is the value from the - // custom_filters.action database column, it is an integer - // representing a value in an enum defined by Ruby on Rails: - // - // enum { warn: 0, hide: 1 } - filter_action: ['warn', 'hide'][filter.filter_action], - }, - }; - } - - return cache; - }, {}); - - // Construct the regular expressions for the custom filters: This - // needs to be done in a separate loop as the database returns one - // filterRow per keyword, so we need all the keywords before - // constructing the regular expression - // @ts-ignore - Object.keys(req.cachedFilters).forEach((key) => { - // @ts-ignore - req.cachedFilters[key].regexp = new RegExp(req.cachedFilters[key].keywords.map(([keyword, whole_word]) => { - let expr = keyword.replace(/[.*+?^${}()|[\]\\]/g, '\\$&'); - - if (whole_word) { - if (/^[\w]/.test(expr)) { - expr = `\\b${expr}`; - } - - if (/[\w]$/.test(expr)) { - expr = `${expr}\\b`; - } - } - - return expr; - }).join('|'), 'i'); - }); - } - // Apply cachedFilters against the payload, constructing a // `filter_results` array of FilterResult entities // @ts-ignore - if (req.cachedFilters) { + if (Object.keys(req.cachedFilters).length) { const status = payload; // TODO: Calculate searchableContent in Ruby on Rails: // @ts-ignore @@ -974,6 +1013,7 @@ const startServer = async () => { // null as we only are only applying the keyword-based custom // filters, not the status-based custom filters. // https://docs.joinmastodon.org/entities/FilterResult/ + // @ts-ignore results.push({ filter: cachedFilter.filter, keyword_matches,