import type { concatMap } from 'rxjs';
import { Observable, ObservableInput, ObservedValueOf, OperatorFunction, from, map, mergeMap } from 'rxjs';

// https://stackoverflow.com/a/57045893
/** {@link concatMap}처럼 순서대로 결과가 나오나, 처리 자체는 들어오는 즉시 하는 operator */
export function concurrentConcatMap<T, O extends ObservableInput<any>>(
  project: (value: T, index: number) => O
): OperatorFunction<T, ObservedValueOf<O>> {
  return (source) =>
    new Observable<ObservedValueOf<O>>((subscriber) => {
      const buffer = new Map<number, ObservedValueOf<O>>();
      let nextEmitIndex = 0;

      return source.pipe(mergeMap((value, index) => from(project(value, index)).pipe(map((value2) => ({ value: value2, index }))))).subscribe({
        next({ value, index }) {
          if (index > nextEmitIndex) {
            buffer.set(index, value);
          } else {
            subscriber.next(value);
            while (buffer.has(++nextEmitIndex)) {
              subscriber.next(buffer.get(nextEmitIndex)!);
              buffer.delete(nextEmitIndex);
            }
          }
        },
        error(err) {
          subscriber.error(err);
        },
        complete() {
          subscriber.complete();
        },
      });
    });
}
