Create index on user_ips in the background
user_ips is kinda big, so really we want to add the index in the background once we're running. Replace the schema delta with one which will do that. I've done this in a way that's reasonably easy to reuse as there a few other indexes I need, and I don't suppose they will be the last.pull/945/head
							parent
							
								
									68a92afcff
								
							
						
					
					
						commit
						ec5717caf5
					
				|  | @ -14,6 +14,7 @@ | |||
| # limitations under the License. | ||||
| 
 | ||||
| from ._base import SQLBaseStore | ||||
| from . import engines | ||||
| 
 | ||||
| from twisted.internet import defer | ||||
| 
 | ||||
|  | @ -106,13 +107,13 @@ class BackgroundUpdateStore(SQLBaseStore): | |||
|                 ) | ||||
|             except: | ||||
|                 logger.exception("Error doing update") | ||||
| 
 | ||||
|             if result is None: | ||||
|                 logger.info( | ||||
|                     "No more background updates to do." | ||||
|                     " Unscheduling background update task." | ||||
|                 ) | ||||
|                 return | ||||
|             else: | ||||
|                 if result is None: | ||||
|                     logger.info( | ||||
|                         "No more background updates to do." | ||||
|                         " Unscheduling background update task." | ||||
|                     ) | ||||
|                     return | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def do_background_update(self, desired_duration_ms): | ||||
|  | @ -202,6 +203,64 @@ class BackgroundUpdateStore(SQLBaseStore): | |||
|         """ | ||||
|         self._background_update_handlers[update_name] = update_handler | ||||
| 
 | ||||
|     def register_background_index_update(self, update_name, index_name, | ||||
|                                          table, columns): | ||||
|         """Helper for store classes to do a background index addition | ||||
| 
 | ||||
|         To use: | ||||
| 
 | ||||
|         1. use a schema delta file to add a background update. Example: | ||||
|             INSERT INTO background_updates (update_name, progress_json) VALUES | ||||
|                 ('my_new_index', '{}'); | ||||
| 
 | ||||
|         2. In the Store constructor, call this method | ||||
| 
 | ||||
|         Args: | ||||
|             update_name (str): update_name to register for | ||||
|             index_name (str): name of index to add | ||||
|             table (str): table to add index to | ||||
|             columns (list[str]): columns/expressions to include in index | ||||
|         """ | ||||
| 
 | ||||
|         # if this is postgres, we add the indexes concurrently. Otherwise | ||||
|         # we fall back to doing it inline | ||||
|         if isinstance(self.database_engine, engines.PostgresEngine): | ||||
|             conc = True | ||||
|         else: | ||||
|             conc = False | ||||
| 
 | ||||
|         sql = "CREATE INDEX %(conc)s %(name)s ON %(table)s (%(columns)s)" \ | ||||
|               % { | ||||
|                   "conc": "CONCURRENTLY" if conc else "", | ||||
|                   "name": index_name, | ||||
|                   "table": table, | ||||
|                   "columns": ", ".join(columns), | ||||
|               } | ||||
| 
 | ||||
|         def create_index_concurrently(conn): | ||||
|             conn.rollback() | ||||
|             # postgres insists on autocommit for the index | ||||
|             conn.set_session(autocommit=True) | ||||
|             c = conn.cursor() | ||||
|             c.execute(sql) | ||||
|             conn.set_session(autocommit=False) | ||||
| 
 | ||||
|         def create_index(conn): | ||||
|             c = conn.cursor() | ||||
|             c.execute(sql) | ||||
| 
 | ||||
|         @defer.inlineCallbacks | ||||
|         def updater(progress, batch_size): | ||||
|             logger.info("Adding index %s to %s", index_name, table) | ||||
|             if conc: | ||||
|                 yield self.runWithConnection(create_index_concurrently) | ||||
|             else: | ||||
|                 yield self.runWithConnection(create_index) | ||||
|             yield self._end_background_update(update_name) | ||||
|             defer.returnValue(1) | ||||
| 
 | ||||
|         self.register_background_update_handler(update_name, updater) | ||||
| 
 | ||||
|     def start_background_update(self, update_name, progress): | ||||
|         """Starts a background update running. | ||||
| 
 | ||||
|  |  | |||
|  | @ -15,10 +15,11 @@ | |||
| 
 | ||||
| import logging | ||||
| 
 | ||||
| from ._base import SQLBaseStore, Cache | ||||
| 
 | ||||
| from twisted.internet import defer | ||||
| 
 | ||||
| from ._base import Cache | ||||
| from . import background_updates | ||||
| 
 | ||||
| logger = logging.getLogger(__name__) | ||||
| 
 | ||||
| # Number of msec of granularity to store the user IP 'last seen' time. Smaller | ||||
|  | @ -27,8 +28,7 @@ logger = logging.getLogger(__name__) | |||
| LAST_SEEN_GRANULARITY = 120 * 1000 | ||||
| 
 | ||||
| 
 | ||||
| class ClientIpStore(SQLBaseStore): | ||||
| 
 | ||||
| class ClientIpStore(background_updates.BackgroundUpdateStore): | ||||
|     def __init__(self, hs): | ||||
|         self.client_ip_last_seen = Cache( | ||||
|             name="client_ip_last_seen", | ||||
|  | @ -37,6 +37,14 @@ class ClientIpStore(SQLBaseStore): | |||
| 
 | ||||
|         super(ClientIpStore, self).__init__(hs) | ||||
| 
 | ||||
|         self.register_background_index_update( | ||||
|             "user_ips_device_index", | ||||
|             index_name="user_ips_device_id", | ||||
|             table="user_ips", | ||||
|             columns=["user_id", "device_id", "last_seen"], | ||||
|         ) | ||||
| 
 | ||||
| 
 | ||||
|     @defer.inlineCallbacks | ||||
|     def insert_client_ip(self, user, access_token, ip, user_agent, device_id): | ||||
|         now = int(self._clock.time_msec()) | ||||
|  |  | |||
|  | @ -13,4 +13,5 @@ | |||
|  * limitations under the License. | ||||
|  */ | ||||
| 
 | ||||
| CREATE INDEX user_ips_device_id ON user_ips(user_id, device_id, last_seen); | ||||
| INSERT INTO background_updates (update_name, progress_json) VALUES | ||||
|   ('user_ips_device_index', '{}'); | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue
	
	 Richard van der Hoff
						Richard van der Hoff