diff --git a/src/stores/OwnBeaconStore.ts b/src/stores/OwnBeaconStore.ts index b6ad63b9c7..e4c0d46a6d 100644 --- a/src/stores/OwnBeaconStore.ts +++ b/src/stores/OwnBeaconStore.ts @@ -48,11 +48,14 @@ const isOwnBeacon = (beacon: Beacon, userId: string): boolean => beacon.beaconIn export enum OwnBeaconStoreEvent { LivenessChange = 'OwnBeaconStore.LivenessChange', MonitoringLivePosition = 'OwnBeaconStore.MonitoringLivePosition', + WireError = 'WireError', } const MOVING_UPDATE_INTERVAL = 2000; const STATIC_UPDATE_INTERVAL = 30000; +const BAIL_AFTER_CONSECUTIVE_ERROR_COUNT = 2; + type OwnBeaconStoreState = { beacons: Map; beaconWireErrors: Map; @@ -65,9 +68,11 @@ export class OwnBeaconStore extends AsyncStoreWithClient { public readonly beacons = new Map(); public readonly beaconsByRoomId = new Map>(); /** - * Track over the wire errors for beacons + * Track over the wire errors for published positions + * Counts consecutive wire errors per beacon + * Reset on successful publish of location */ - public readonly beaconWireErrors = new Map(); + public readonly beaconWireErrorCounts = new Map(); private liveBeaconIds = []; private locationInterval: number; private geolocationError: GeolocationError | undefined; @@ -106,7 +111,7 @@ export class OwnBeaconStore extends AsyncStoreWithClient { this.beacons.clear(); this.beaconsByRoomId.clear(); this.liveBeaconIds = []; - this.beaconWireErrors.clear(); + this.beaconWireErrorCounts.clear(); } protected async onReady(): Promise { @@ -125,6 +130,25 @@ export class OwnBeaconStore extends AsyncStoreWithClient { return !!this.getLiveBeaconIds(roomId).length; } + /** + * If a beacon has failed to publish position + * past the allowed consecutive failure count (BAIL_AFTER_CONSECUTIVE_ERROR_COUNT) + * Then consider it to have an error + */ + public hasWireError(beaconId: string): boolean { + return this.beaconWireErrorCounts.get(beaconId) >= BAIL_AFTER_CONSECUTIVE_ERROR_COUNT; + } + + public resetWireError(beaconId: string): void { + this.incrementBeaconWireErrorCount(beaconId, false); + + // always publish to all live beacons together + // instead of just one that was changed + // to keep lastPublishedTimestamp simple + // and extra published locations don't hurt + this.publishCurrentLocationToBeacons(); + } + public getLiveBeaconIds(roomId?: string): string[] { if (!roomId) { return this.liveBeaconIds; @@ -202,6 +226,13 @@ export class OwnBeaconStore extends AsyncStoreWithClient { * State management */ + /** + * Live beacon ids that do not have wire errors + */ + private get healthyLiveBeaconIds() { + return this.liveBeaconIds.filter(beaconId => !this.hasWireError(beaconId)); + } + private initialiseBeaconState = () => { const userId = this.matrixClient.getUserId(); const visibleRooms = this.matrixClient.getVisibleRooms(); @@ -399,7 +430,7 @@ export class OwnBeaconStore extends AsyncStoreWithClient { */ private publishLocationToBeacons = async (position: TimedGeoUri) => { this.lastPublishedPositionTimestamp = Date.now(); - await Promise.all(this.liveBeaconIds.map(beaconId => + await Promise.all(this.healthyLiveBeaconIds.map(beaconId => this.sendLocationToBeacon(this.beacons.get(beaconId), position)), ); }; @@ -413,9 +444,35 @@ export class OwnBeaconStore extends AsyncStoreWithClient { const content = makeBeaconContent(geoUri, timestamp, beacon.beaconInfoId); try { await this.matrixClient.sendEvent(beacon.roomId, M_BEACON.name, content); + this.incrementBeaconWireErrorCount(beacon.identifier, false); } catch (error) { logger.error(error); - this.beaconWireErrors.set(beacon.identifier, error); + this.incrementBeaconWireErrorCount(beacon.identifier, true); + } + }; + + /** + * Manage beacon wire error count + * - clear count for beacon when not error + * - increment count for beacon when is error + * - emit if beacon error count crossed threshold + */ + private incrementBeaconWireErrorCount = (beaconId: string, isError: boolean): void => { + const hadError = this.hasWireError(beaconId); + + if (isError) { + // increment error count + this.beaconWireErrorCounts.set( + beaconId, + (this.beaconWireErrorCounts.get(beaconId) ?? 0) + 1, + ); + } else { + // clear any error count + this.beaconWireErrorCounts.delete(beaconId); + } + + if (this.hasWireError(beaconId) !== hadError) { + this.emit(OwnBeaconStoreEvent.WireError, beaconId); } }; } diff --git a/test/stores/OwnBeaconStore-test.ts b/test/stores/OwnBeaconStore-test.ts index d40708109e..3c924594f5 100644 --- a/test/stores/OwnBeaconStore-test.ts +++ b/test/stores/OwnBeaconStore-test.ts @@ -166,7 +166,7 @@ describe('OwnBeaconStore', () => { geolocation = mockGeolocation(); mockClient.getVisibleRooms.mockReturnValue([]); mockClient.unstable_setLiveBeacon.mockClear().mockResolvedValue({ event_id: '1' }); - mockClient.sendEvent.mockClear().mockResolvedValue({ event_id: '1' }); + mockClient.sendEvent.mockReset().mockResolvedValue({ event_id: '1' }); jest.spyOn(global.Date, 'now').mockReturnValue(now); jest.spyOn(OwnBeaconStore.instance, 'emit').mockRestore(); jest.spyOn(logger, 'error').mockRestore(); @@ -696,7 +696,7 @@ describe('OwnBeaconStore', () => { }); }); - describe('sending positions', () => { + describe('publishing positions', () => { it('stops watching position when user has no more live beacons', async () => { // geolocation is only going to emit 1 position geolocation.watchPosition.mockImplementation( @@ -825,6 +825,136 @@ describe('OwnBeaconStore', () => { }); }); + describe('when publishing position fails', () => { + beforeEach(() => { + geolocation.watchPosition.mockImplementation( + watchPositionMockImplementation([0, 1000, 3000, 3000, 3000]), + ); + + // eat expected console error logs + jest.spyOn(logger, 'error').mockImplementation(() => { }); + }); + + // we need to advance time and then flush promises + // individually for each call to sendEvent + // otherwise the sendEvent doesn't reject/resolve and update state + // before the next call + // advance and flush every 1000ms + // until given ms is 'elapsed' + const advanceAndFlushPromises = async (timeMs: number) => { + while (timeMs > 0) { + jest.advanceTimersByTime(1000); + await flushPromisesWithFakeTimers(); + timeMs -= 1000; + } + }; + + it('continues publishing positions after one publish error', async () => { + // fail to send first event, then succeed + mockClient.sendEvent.mockRejectedValueOnce(new Error('oups')).mockResolvedValue({ event_id: '1' }); + makeRoomsWithStateEvents([ + alicesRoom1BeaconInfo, + ]); + const store = await makeOwnBeaconStore(); + // wait for store to settle + await flushPromisesWithFakeTimers(); + + await advanceAndFlushPromises(50000); + + // called for each position from watchPosition + expect(mockClient.sendEvent).toHaveBeenCalledTimes(5); + expect(store.hasWireError(alicesRoom1BeaconInfo.getType())).toBe(false); + }); + + it('continues publishing positions when a beacon fails intermittently', async () => { + // every second event rejects + // meaning this beacon has more errors than the threshold + // but they are not consecutive + mockClient.sendEvent + .mockRejectedValueOnce(new Error('oups')) + .mockResolvedValueOnce({ event_id: '1' }) + .mockRejectedValueOnce(new Error('oups')) + .mockResolvedValueOnce({ event_id: '1' }) + .mockRejectedValueOnce(new Error('oups')); + + makeRoomsWithStateEvents([ + alicesRoom1BeaconInfo, + ]); + const store = await makeOwnBeaconStore(); + const emitSpy = jest.spyOn(store, 'emit'); + // wait for store to settle + await flushPromisesWithFakeTimers(); + + await advanceAndFlushPromises(50000); + + // called for each position from watchPosition + expect(mockClient.sendEvent).toHaveBeenCalledTimes(5); + expect(store.hasWireError(alicesRoom1BeaconInfo.getType())).toBe(false); + expect(emitSpy).not.toHaveBeenCalledWith( + OwnBeaconStoreEvent.WireError, alicesRoom1BeaconInfo.getType(), + ); + }); + + it('stops publishing positions when a beacon fails consistently', async () => { + // always fails to send events + mockClient.sendEvent.mockRejectedValue(new Error('oups')); + makeRoomsWithStateEvents([ + alicesRoom1BeaconInfo, + ]); + const store = await makeOwnBeaconStore(); + const emitSpy = jest.spyOn(store, 'emit'); + // wait for store to settle + await flushPromisesWithFakeTimers(); + + // 5 positions from watchPosition in this period + await advanceAndFlushPromises(50000); + + // only two allowed failures + expect(mockClient.sendEvent).toHaveBeenCalledTimes(2); + expect(store.hasWireError(alicesRoom1BeaconInfo.getType())).toBe(true); + expect(emitSpy).toHaveBeenCalledWith( + OwnBeaconStoreEvent.WireError, alicesRoom1BeaconInfo.getType(), + ); + }); + + it('restarts publishing a beacon after resetting wire error', async () => { + // always fails to send events + mockClient.sendEvent.mockRejectedValue(new Error('oups')); + makeRoomsWithStateEvents([ + alicesRoom1BeaconInfo, + ]); + const store = await makeOwnBeaconStore(); + const emitSpy = jest.spyOn(store, 'emit'); + // wait for store to settle + await flushPromisesWithFakeTimers(); + + // 3 positions from watchPosition in this period + await advanceAndFlushPromises(4000); + + // only two allowed failures + expect(mockClient.sendEvent).toHaveBeenCalledTimes(2); + expect(store.hasWireError(alicesRoom1BeaconInfo.getType())).toBe(true); + expect(emitSpy).toHaveBeenCalledWith( + OwnBeaconStoreEvent.WireError, alicesRoom1BeaconInfo.getType(), + ); + + // reset emitSpy mock counts to asser on wireError again + emitSpy.mockClear(); + store.resetWireError(alicesRoom1BeaconInfo.getType()); + + expect(store.hasWireError(alicesRoom1BeaconInfo.getType())).toBe(false); + + // 2 more positions from watchPosition in this period + await advanceAndFlushPromises(10000); + + // 2 from before, 2 new ones + expect(mockClient.sendEvent).toHaveBeenCalledTimes(4); + expect(emitSpy).toHaveBeenCalledWith( + OwnBeaconStoreEvent.WireError, alicesRoom1BeaconInfo.getType(), + ); + }); + }); + it('publishes subsequent positions', async () => { // modern fake timers + debounce + promises are not friends // just testing that positions are published