import { Error, Message, Request } from "src/gen/schema/syncfollow/syncfollow";
import { MD5 } from "src/utils/md5";
import { Observable } from "./observable";
import { ErrorHandler, StreamClient, UpdateHandler } from "./streamClient";
import { logger } from "src/core/globals";
import { ApiError } from "src/core/apiError";

export type StreamTransformer<TResponse> = (
  resp: Message
) => TResponse | undefined;

export type StreamReducer<TResponse> = (
  prev: TResponse | undefined,
  curr: TResponse
) => TResponse;

type EVT = "end";

/**
 * This class is designed to manage
 * streaming data, handling the lifecycle
 * of streams, and broadcasting updates
 * to subscribed clients.
 */
export class Stream<
  TRequest extends Message,
  TResponse
  /* eslint-disable-next-line @typescript-eslint/no-explicit-any */
> extends Observable<any> {
  static END: EVT = "end";
  private clients: Map<string, StreamClient> = new Map();
  public count: number = 0;
  public syncId: string = "";
  public lastMessage?: Date;

  private parseMessage: (m: Message) => void;
  private message: TRequest;
  /* eslint-disable-next-line @typescript-eslint/no-explicit-any */
  public data: any;
  public loading: boolean = false;
  constructor(
    message: TRequest,
    transformer: StreamTransformer<TResponse>,
    reducer: StreamReducer<TResponse>
  ) {
    super();
    /**
     * Need to be able to message the StreamManager
     * that this stream is done. This is done using
     * the END Event
     */
    this.addEvents([Stream.END]);
    /**
     * This is the streaming Request
     * wrapped in a Message.
     */
    this.message = message;
    /**
     * Need to get a little creative here, making
     * the syncId the hashed request.
     */
    this.syncId = MD5.hashJson(message);

    /**
     * Internal method to parse the message
     * before it is broadcast to the consumers
     */
    this.parseMessage = (m: Message) => {
      const data = transformer(m);
      if (data) {
        this.data = Object.freeze(reducer(this.data, data));
      }
    };
  }

  getRequest = (): Request | undefined => this.message.request;

  getStringifiedRequest = (): string => {
    /**
     * Setting the syncId in the request
     * alters the object which will cause
     * issues. Instead, to we can keep track
     * of what messages are for which consumers
     * I'm making it part of the stringified
     * message to the websocket and keeping
     * the original request unchanged.
     */
    return JSON.stringify({
      request: { ...this.message.request, syncId: this.syncId }
    });
  };

  /**
   * The followers method returns the number
   * of clients subscribed to the stream
   */
  followers = (): number => this.clients.size;

  /**
   * The subscribe method allows clients to
   * subscribe to the stream, providing update
   * and error handlers. It returns an unsubscribe
   * function for cleanup
   */
  subscribe(onUpdate: UpdateHandler, onError: ErrorHandler): () => void {
    const client = new StreamClient(onUpdate, onError);
    this.clients.set(client.id, client);
    if (this.data) {
      client.updateFn(this.data, this.loading);
    }
    return () => {
      this.cleanup(client.id);
    };
  }

  cleanup(id: string) {
    this.clients.delete(id);
    setTimeout(() => {
      if (this.clients.size === 0) {
        /**
         * Need to let any StreamManagers
         * know this stream has no more
         * clients and can be cleaned up.
         */
        this.fireEvent(Stream.END, this);
      }
    }, 30000);
  }

  handleError(msg: Message) {
    const error: Error | undefined = msg.error;
    if (error) {
      /**
       * 404 errors are common when it comes
       * to the candidates. this will occur
       * when the candidate config is out of
       * sync with the running config. Anything
       * missing from the running config will
       * not have operational data.
       */
      if (error?.httpStatusCode === 404) {
        logger.warn(this.message.request, error);
      } else {
        logger.error(this.message.request, error);
        /**
         * TODO: propagate errors
         */
        const apiError = new ApiError(error);
        this.clients.forEach((client) => {
          client.errorFn(apiError);
        });
      }
    }
  }

  handleMessage(msg: Message) {
    this.parseMessage(msg);
    this.count++;
    this.lastMessage = new Date();
    this.clients.forEach((client) => {
      client.updateFn(this.data, this.loading);
    });
  }
}
