mirror of https://github.com/Chocobozzz/PeerTube
Prevent concurrency issues when sending m3u8 file
parent
0177101284
commit
b3ce36069f
|
@ -3,6 +3,7 @@ import { mapSeries } from 'bluebird'
|
||||||
import { FSWatcher, watch } from 'chokidar'
|
import { FSWatcher, watch } from 'chokidar'
|
||||||
import { FfmpegCommand } from 'fluent-ffmpeg'
|
import { FfmpegCommand } from 'fluent-ffmpeg'
|
||||||
import { appendFile, ensureDir, readFile, stat } from 'fs-extra'
|
import { appendFile, ensureDir, readFile, stat } from 'fs-extra'
|
||||||
|
import PQueue from 'p-queue'
|
||||||
import { basename, join } from 'path'
|
import { basename, join } from 'path'
|
||||||
import { EventEmitter } from 'stream'
|
import { EventEmitter } from 'stream'
|
||||||
import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg'
|
import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg'
|
||||||
|
@ -21,7 +22,6 @@ import { LiveSegmentShaStore } from '../live-segment-sha-store'
|
||||||
import { buildConcatenatedName } from '../live-utils'
|
import { buildConcatenatedName } from '../live-utils'
|
||||||
|
|
||||||
import memoizee = require('memoizee')
|
import memoizee = require('memoizee')
|
||||||
|
|
||||||
interface MuxingSessionEvents {
|
interface MuxingSessionEvents {
|
||||||
'live-ready': (options: { videoId: number }) => void
|
'live-ready': (options: { videoId: number }) => void
|
||||||
|
|
||||||
|
@ -278,11 +278,18 @@ class MuxingSession extends EventEmitter {
|
||||||
private watchM3U8File () {
|
private watchM3U8File () {
|
||||||
this.m3u8Watcher = watch(this.outDirectory + '/*.m3u8')
|
this.m3u8Watcher = watch(this.outDirectory + '/*.m3u8')
|
||||||
|
|
||||||
|
const sendQueues = new Map<string, PQueue>()
|
||||||
|
|
||||||
const onChangeOrAdd = async (m3u8Path: string) => {
|
const onChangeOrAdd = async (m3u8Path: string) => {
|
||||||
if (this.streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return
|
if (this.streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await storeHLSFileFromPath(this.streamingPlaylist, m3u8Path)
|
if (!sendQueues.has(m3u8Path)) {
|
||||||
|
sendQueues.set(m3u8Path, new PQueue({ concurrency: 1 }))
|
||||||
|
}
|
||||||
|
|
||||||
|
const queue = sendQueues.get(m3u8Path)
|
||||||
|
await queue.add(() => storeHLSFileFromPath(this.streamingPlaylist, m3u8Path))
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() })
|
logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() })
|
||||||
}
|
}
|
||||||
|
|
|
@ -314,7 +314,7 @@ describe('Test syndication feeds', () => {
|
||||||
const jsonObj = JSON.parse(json)
|
const jsonObj = JSON.parse(json)
|
||||||
const imageUrl = jsonObj.icon
|
const imageUrl = jsonObj.icon
|
||||||
expect(imageUrl).to.include('/lazy-static/avatars/')
|
expect(imageUrl).to.include('/lazy-static/avatars/')
|
||||||
await makeRawRequest({ url: imageUrl })
|
await makeRawRequest({ url: imageUrl, expectedStatus: HttpStatusCode.OK_200 })
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue