import { SextantStreamManager } from "./sextantStreamManager";
import { StreamManager } from "./streamManager";
import { StreamingCachePrintType } from "../dataflow/streamingCache";
import { Stream } from "./core/stream";
import { Message } from "src/gen/schema/syncfollow/syncfollow";
import { BilgeStreamManager } from "./bilgeStreamManager";

/**
 * StreamPool is an abstraction for StreamManager
 * subclassed. It makes it transparent to use an
 * individual StreamManager for a given pool of
 * StreamManagers
 */
export abstract class StreamPool<TKey, TStreamManager extends StreamManager> {
  protected maxSize: number = 10;
  protected pool = new Map<TKey, TStreamManager>();

  abstract useStream(id: TKey | undefined): TStreamManager | undefined;

  prune(m: TStreamManager | undefined) {
    if (m) {
      setTimeout(() => {
        this.pool.forEach((manager, key) => {
          if (m === manager) {
            if (!manager.isConnected()) {
              this.pool.delete(key);
            }
          }
        });
      }, 1000);
    }
  }

  close(id: TKey) {
    const socket = this.pool.get(id);
    if (socket) {
      socket.close();
    }
  }

  getStreams(): Stream<Message, Message>[] {
    const data: Stream<Message, Message>[] = [];
    this.pool.forEach((socket: StreamManager) => {
      const tmp = socket.getStreams();
      data.push(...tmp);
    }, []);
    return data;
  }

  showContent(): StreamingCachePrintType[] {
    const data: StreamingCachePrintType[] = [];
    this.pool.forEach((socket: StreamManager) => {
      const tmp = socket.showContent();
      data.push(...tmp);
    }, []);
    return data;
  }
}

/**
 * StreamManager specific implementation
 * of the StreamPool class.
 */
export class SextantStreamPool extends StreamPool<
  string | undefined,
  SextantStreamManager
> {
  /**
   * This will return the
   * SextantStreamManager associated with
   * the given id. If one does not exist
   * it will create one.
   */
  useStream(id: string | undefined) {
    if (id) {
      let socket: SextantStreamManager | undefined = this.pool.get(id);
      if (!socket) {
        socket = new SextantStreamManager();
        socket.on(StreamManager.PRUNE, () => this.prune(socket), this);
        this.pool.set(id, socket);
      }
      if (!socket.isConnected()) {
        socket.init(id);
      }
      return socket;
    }
    return undefined;
  }
}

/**
 * StreamManager specific implementation
 * of the StreamPool class.
 */
export class BilgeStreamPool extends StreamPool<
  string | undefined,
  BilgeStreamManager
> {
  /**
   * This will return the
   * BilgeStreamManager associated with
   * the given id. If one does not exist
   * it will create one.
   */
  useStream(id: string | undefined) {
    if (id) {
      let socket: BilgeStreamManager | undefined = this.pool.get(id);
      if (!socket) {
        socket = new BilgeStreamManager();
        socket.on(StreamManager.PRUNE, () => this.prune(socket), this);
        this.pool.set(id, socket);
      }
      if (!socket.isConnected()) {
        socket.init();
      }
      return socket;
    }
    return undefined;
  }
}
