import { Socket } from "./core/socket";
import { Stream, StreamReducer, StreamTransformer } from "./core/stream";
import {
  MarkerType,
  Message,
  QueryType,
  Request
} from "src/gen/schema/syncfollow/syncfollow";
import { StreamManager } from "./streamManager";
import { unreachable } from "src/utils/switch";

export abstract class SyncFollowStreamManager extends StreamManager {
  protected sock: Socket = new Socket();
  protected streams: Map<string, Stream<Message, Message>> = new Map();

  addStream<TMessage extends Message, TResponse>(
    message: TMessage,
    transformer: StreamTransformer<TResponse>,
    reducer: StreamReducer<TResponse>
  ): Stream<Message, Message> | undefined {
    const stream = new Stream(message, transformer, reducer);
    /**
     * When a new Stream is created we need
     * to add a listener for when it is no
     * longer needed. at that point the
     * Stream Manager needs to clean up and
     * remove the unused stream
     */
    stream.on(Stream.END, this.cleanup, this);
    /**
     * Add the new stream to the streams
     * Map.
     */
    this.streams.set(stream.syncId, stream);
    if (this.sock) {
      /**
       * Send the request as a
       * string to the websocket
       * to start the syncAndFollow.
       */
      this.sock.send(stream.getStringifiedRequest());
    }
    return stream;
  }

  cleanup(s: Stream<Message, Message>) {
    /**
     * This will delay cleanup
     * for 10 second. That way
     * if another client gets
     * added within that 10 seconds
     * we will reuse the existing
     * Stream.
     */
    setTimeout(() => {
      /**
       * Before cleaning up the stream
       * make sure that there are no
       * followers (stream.clients). If there
       * are clients we will wait until
       * they are all removed before cleaning
       * up the stream.
       */
      if (s.followers() === 0) {
        const msg = JSON.stringify(
          Message.fromPartial({
            request: Request.fromPartial({
              syncId: s.syncId,
              query: QueryType.Stop
            })
          })
        );
        /* send the websocket the stop query */
        this.sock?.send(msg);
        /* removed the registry entry for this Stream */
        this.streams.delete(s.syncId);
        /**
         * if there are no more
         * streams, close the
         * connection
         */
        if (this.streams.size === 0) {
          this.prune();
        }
      }
    }, 10000);
  }

  prune() {
    /**
     * This will delay pruning
     * for 10 second. That way
     * if another Stream gets
     * added within that 10 seconds
     * we will reuse the existing
     * Socket.
     */
    setTimeout(() => {
      if (this.streams.size === 0) {
        this.fireEvent(StreamManager.PRUNE, this);
        this.sock.close();
      }
    }, 10000);
  }

  handleMessage(m: MessageEvent<string>) {
    // create object from json payload, this will fill in all the default values
    const msg = Message.fromJSON(JSON.parse(m.data));

    if (msg.error) {
      /**
       * Need to add some king of global
       * Error handler here. I don't think
       * it makes sense to try and make
       * the websocket client handle this.
       */
      const stream = this.streams.get(msg.error.syncId);
      stream?.handleError(msg);
      return;
    }

    if (msg.data) {
      const { marker = MarkerType.None, syncId } = msg.data;
      if (syncId) {
        const stream = this.streams.get(syncId);
        if (stream) {
          switch (marker) {
            case MarkerType.StartOfSync:
              /**
               * According to backend the data
               * should be reset and rebuilt
               * if you get StartOfSync
               * marker.
               */
              stream.loading = true;
              stream.data = [];
              break;
            case MarkerType.ResumeFailed:
              /**
               * if ResumeFailed, try sending
               * the request again and re-syncing
               */
              this?.sock?.send(stream.getStringifiedRequest());
              break;
            case MarkerType.EndOfSync:
            case MarkerType.StartOfFollow:
              stream.loading = false;
              break;
            case MarkerType.UpToDate:
              /**
               * When we are up to date makes sure
               * we send the data. This way calls will
               * not get stuck in limbo
               */
              stream.handleMessage(stream.data);
              break;
            case MarkerType.None:
              /**
               * Data messages have no maker,
               * This will to process
               * the message and let the
               * stream pass data to its
               * consumer(s)
               */
              stream.handleMessage(msg);
              break;
            case MarkerType.UNRECOGNIZED:
            case MarkerType.StillAlive:
              /**
               * We ignore these messages.
               */
              break;
            default:
              /**
               * This will cause a compile
               * time error if a new MarkerType
               * is added and not handled.
               */
              unreachable(marker);
          }
        }
      }
    }
  }

  handleReconnect() {
    /**
     * Finish reconnect logic
     */
    this.streams.forEach((stream: Stream<Message, Message>) => {
      this?.sock?.send(stream.getStringifiedRequest());
    });
  }
}
