Prune rows in user_ips older than configured period
Defaults to pruning everything older than 28d.pull/6098/head
							parent
							
								
									2135c198d1
								
							
						
					
					
						commit
						242017db8b
					
				|  | @ -313,6 +313,12 @@ listeners: | |||
| # | ||||
| redaction_retention_period: 7d | ||||
| 
 | ||||
| # How long to track users' last seen time and IPs in the database. | ||||
| # | ||||
| # Defaults to `28d`. Set to `null` to disable. | ||||
| # | ||||
| #user_ips_max_age: 14d | ||||
| 
 | ||||
| 
 | ||||
| ## TLS ## | ||||
| 
 | ||||
|  |  | |||
|  | @ -172,6 +172,13 @@ class ServerConfig(Config): | |||
|         else: | ||||
|             self.redaction_retention_period = None | ||||
| 
 | ||||
|         # How long to keep entries in the `users_ips` table. | ||||
|         user_ips_max_age = config.get("user_ips_max_age", "28d") | ||||
|         if user_ips_max_age is not None: | ||||
|             self.user_ips_max_age = self.parse_duration(user_ips_max_age) | ||||
|         else: | ||||
|             self.user_ips_max_age = None | ||||
| 
 | ||||
|         # Options to disable HS | ||||
|         self.hs_disabled = config.get("hs_disabled", False) | ||||
|         self.hs_disabled_message = config.get("hs_disabled_message", "") | ||||
|  | @ -735,6 +742,12 @@ class ServerConfig(Config): | |||
|         # Defaults to `7d`. Set to `null` to disable. | ||||
|         # | ||||
|         redaction_retention_period: 7d | ||||
| 
 | ||||
|         # How long to track users' last seen time and IPs in the database. | ||||
|         # | ||||
|         # Defaults to `28d`. Set to `null` to disable. | ||||
|         # | ||||
|         #user_ips_max_age: 14d | ||||
|         """ | ||||
|             % locals() | ||||
|         ) | ||||
|  |  | |||
|  | @ -19,7 +19,7 @@ from six import iteritems | |||
| 
 | ||||
| from twisted.internet import defer | ||||
| 
 | ||||
| from synapse.metrics.background_process_metrics import run_as_background_process | ||||
| from synapse.metrics.background_process_metrics import wrap_as_background_process | ||||
| from synapse.util.caches import CACHE_SIZE_FACTOR | ||||
| 
 | ||||
| from . import background_updates | ||||
|  | @ -42,6 +42,8 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): | |||
| 
 | ||||
|         super(ClientIpStore, self).__init__(db_conn, hs) | ||||
| 
 | ||||
|         self.user_ips_max_age = hs.config.user_ips_max_age | ||||
| 
 | ||||
|         self.register_background_index_update( | ||||
|             "user_ips_device_index", | ||||
|             index_name="user_ips_device_id", | ||||
|  | @ -100,6 +102,9 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): | |||
|             "before", "shutdown", self._update_client_ips_batch | ||||
|         ) | ||||
| 
 | ||||
|         if self.user_ips_max_age: | ||||
|             self._clock.looping_call(self._prune_old_user_ips, 5 * 1000) | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def _remove_user_ip_nonunique(self, progress, batch_size): | ||||
|         def f(conn): | ||||
|  | @ -319,20 +324,19 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): | |||
| 
 | ||||
|         self._batch_row_update[key] = (user_agent, device_id, now) | ||||
| 
 | ||||
|     @wrap_as_background_process("update_client_ips") | ||||
|     def _update_client_ips_batch(self): | ||||
| 
 | ||||
|         # If the DB pool has already terminated, don't try updating | ||||
|         if not self.hs.get_db_pool().running: | ||||
|             return | ||||
| 
 | ||||
|         def update(): | ||||
|             to_update = self._batch_row_update | ||||
|             self._batch_row_update = {} | ||||
|             return self.runInteraction( | ||||
|                 "_update_client_ips_batch", self._update_client_ips_batch_txn, to_update | ||||
|             ) | ||||
|         to_update = self._batch_row_update | ||||
|         self._batch_row_update = {} | ||||
| 
 | ||||
|         return run_as_background_process("update_client_ips", update) | ||||
|         return self.runInteraction( | ||||
|             "_update_client_ips_batch", self._update_client_ips_batch_txn, to_update | ||||
|         ) | ||||
| 
 | ||||
|     def _update_client_ips_batch_txn(self, txn, to_update): | ||||
|         if "user_ips" in self._unsafe_to_upsert_tables or ( | ||||
|  | @ -496,3 +500,45 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): | |||
|             yield self._end_background_update("devices_last_seen") | ||||
| 
 | ||||
|         return updated | ||||
| 
 | ||||
|     @wrap_as_background_process("prune_old_user_ips") | ||||
|     async def _prune_old_user_ips(self): | ||||
|         """Removes entries in user IPs older than the configured period. | ||||
|         """ | ||||
| 
 | ||||
|         if not self.user_ips_max_age: | ||||
|             # Nothing to do | ||||
|             return | ||||
| 
 | ||||
|         if not await self.has_completed_background_update("devices_last_seen"): | ||||
|             # Only start pruning if we have finished populating the devices | ||||
|             # last seen info. | ||||
|             return | ||||
| 
 | ||||
|         # We do a slightly funky SQL delete to ensure we don't try and delete | ||||
|         # too much at once (as the table may be very large from before we | ||||
|         # started pruning). | ||||
|         # | ||||
|         # This works by finding the max last_seen that is less than the given | ||||
|         # time, but has no more than N rows before it, deleting all rows with | ||||
|         # a lesser last_seen time. (We COALESCE so that the sub-SELECT always | ||||
|         # returns exactly one row). | ||||
|         sql = """ | ||||
|             DELETE FROM user_ips | ||||
|             WHERE last_seen <= ( | ||||
|                 SELECT COALESCE(MAX(last_seen), -1) | ||||
|                 FROM ( | ||||
|                     SELECT last_seen FROM user_ips | ||||
|                     WHERE last_seen <= ? | ||||
|                     ORDER BY last_seen ASC | ||||
|                     LIMIT 5000 | ||||
|                 ) AS u | ||||
|             ) | ||||
|         """ | ||||
| 
 | ||||
|         timestamp = self.clock.time_msec() - self.user_ips_max_age | ||||
| 
 | ||||
|         def _prune_old_user_ips_txn(txn): | ||||
|             txn.execute(sql, (timestamp,)) | ||||
| 
 | ||||
|         await self.runInteraction("_prune_old_user_ips", _prune_old_user_ips_txn) | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue
	
	 Erik Johnston
						Erik Johnston