import { useContext, useEffect, useMemo } from 'react'
import {
  asyncScheduler,
  BehaviorSubject,
  combineLatestWith,
  concatMap,
  connectable,
  debounce,
  distinctUntilChanged,
  EMPTY,
  filter,
  fromEvent,
  interval,
  map,
  mergeWith,
  Observable,
  observeOn,
  onErrorResumeNextWith,
  OperatorFunction,
  retry,
  scan,
  share,
  startWith,
  Subject,
  Subscription,
  switchMap,
  take,
  throwError,
  timer,
  TruthyTypesOf,
  withLatestFrom
} from 'rxjs'
import { fromArrayLike, fromPromise } from 'rxjs/internal/observable/innerFrom'
import { useQueryClient } from '@tanstack/react-query'
import { currentUser } from '../auth/states'
import { apiDomains } from '../global-vars'
import { useSession } from '../room/states'
import { tuple } from '../types'

export function connectWs(ctors: Observable<(() => WebSocket) | undefined>): Observable<WebSocket | undefined> {
  return ctors.pipe(
    switchMap(function operator(ctor: (() => WebSocket) | undefined): Observable<WebSocket | undefined> {
      return ctor
        ? new Observable<WebSocket | undefined>((s) => {
            const w = ctor()
            const { send } = w
            const onError = new Subject()
            w.send = function (...args) {
              switch (w.readyState) {
                case WebSocket.OPEN:
                  send.apply(this, args)
                  break
                default:
                  onError.next(new Error(`websocket is in state ${w.readyState}`))
                  break
              }
            }
            s.add(
              fromEvent(w, 'message')
                .pipe(
                  debounce(() => interval(35_000)),
                  map(() => new Error('connection lost'))
                )
                .subscribe(onError)
            )
            s.add(
              fromEvent(w, 'open')
                .pipe(map(() => w))
                .subscribe(s)
            )
            s.add(
              fromEvent(w, 'open')
                .pipe(
                  take(1),
                  switchMap(() => interval(15_000).pipe(startWith(undefined)))
                )
                .subscribe(() => w.send(JSON.stringify({ type: 'keep-alive' })))
            )
            s.add(
              fromEvent(w, 'error')
                .pipe(mergeWith(fromEvent(w, 'close')), mergeWith(onError), switchMap(throwError))
                .subscribe(s)
            )
            return () => w.close()
          }).pipe(
            onErrorResumeNextWith(
              timer(1000).pipe(
                switchMap(() => operator(ctor)),
                startWith<WebSocket | undefined>(undefined)
              )
            )
          )
        : EMPTY
    })
  )
}

export function withLatestTruthy<A, B>(a: Observable<A>): OperatorFunction<B, [B, TruthyTypesOf<A>]> {
  return (_) => {
    const as = a.pipe(share())
    return _.pipe(
      withLatestFrom(as.pipe(startWith(undefined))),
      scan(
        ([bs, lastA]: [B[], A | undefined], [b, a]) => {
          if (!lastA) return tuple([...bs, b], a)
          return tuple([b], a)
        },
        [[], undefined]
      ),
      combineLatestWith(as),
      map(([_]) => _),
      distinctUntilChanged(),
      filter(([_, a]) => Boolean(a)),
      switchMap(([bs, a]) => fromArrayLike(bs).pipe(map((b) => tuple(b, a as TruthyTypesOf<A>))))
    )
  }
}

/**
 * 场控Websocket
 * @returns [message,ws]
 */
export function useSessionConnection() {
  const session = useSession()
  const { cms: domain } = useContext(apiDomains)
  const queryClient = useQueryClient()
  const user = useContext(currentUser)
  const [messages, ws] = useWebsocket(
    session?.room_id ? `${domain.replace(/^(:?https?:)?\/\//, 'wss://')}/session?token=${user.token}` : ''
  )
  useEffect(() => {
    const s = new Subscription()
    const jsonMessages = connectable(
      messages.pipe(
        observeOn(asyncScheduler),
        map((s) => JSON.parse(s))
      )
    )
    s.add(
      jsonMessages
        .pipe(
          filter(
            (s) =>
              ![
                'keep-alive',
                'text',
                'room_connected',
                'room_disconnected',
                'query_room_status',
                'healthcheck_live_message',
                'microphone_control'
              ].includes(s.type)
          ),
          concatMap(() =>
            fromPromise(
              Promise.all([queryClient.invalidateQueries(['/session']), queryClient.invalidateQueries(['/permission'])])
            )
          ),
          retry()
        )
        .subscribe()
    )
    s.add(jsonMessages.connect())
    return () => s.unsubscribe()
  }, [ws, messages, queryClient])
  return tuple(messages, ws)
}

export function useWebsocket(url: string): [Observable<string>, (_: (_: WebSocket) => void) => void] {
  const ctors = useMemo(() => new Subject<(() => WebSocket) | undefined>(), [])
  const ws = useMemo(() => new BehaviorSubject<WebSocket | undefined>(undefined), [])
  const useWs = useMemo(() => new Subject<(_: WebSocket) => void>(), [])
  const callUseWs = useMemo(() => (action: (_: WebSocket) => void) => useWs.next(action), [useWs])
  const messages = useMemo(
    () =>
      ws.pipe(
        switchMap((ws) =>
          ws ? fromEvent<WebSocketEventMap['message']>(ws, 'message').pipe(map((ev) => ev.data)) : EMPTY
        )
      ),
    [ws]
  )
  useEffect(() => {
    const s = ctors.pipe(connectWs).subscribe(ws)
    return () => s.unsubscribe()
  }, [ctors, ws])
  useEffect(() => {
    const s = useWs.pipe(withLatestTruthy(ws)).subscribe(([use, ws]) => use(ws))
    return () => s.unsubscribe()
  }, [ws, useWs])
  useEffect(() => ctors.next(url ? () => new WebSocket(url) : undefined), [url, ctors])
  return [messages, callUseWs]
}
