import * as Sentry from "@sentry/browser";
import { useQueryClient } from "@tanstack/react-query";
import isNetworkError from "is-network-error";
import { useEffect, useRef } from "react";

import { useAllFlags } from "featureFlags/hooks/useAllFlags";
import { authService } from "services/auth";
import { Database } from "services/databases/useDatabasesNames";
import { WorkspaceEngine } from "services/engines/engine.types";
import { useWorkspaceEngines } from "services/engines/useWorkspaceEngines";
import { ReactQueryKeysAccount } from "services/queryKeys";

import { useDocuments } from "pages/DevelopWorkspace/contexts/DocumentsContext/hooks/useDocuments";
import { FIREBOLT_PROTOCOL_VERSION_HEADER } from "pages/DevelopWorkspace/services/constants";
import cancelQuery from "pages/DevelopWorkspace/services/helpers/cancelQuery";
import executeQueryStatement from "pages/DevelopWorkspace/services/helpers/executeQueryStatement";
import getQueryLabel from "pages/DevelopWorkspace/services/helpers/getQueryLabel";
import getRecentlyStartedQueryByLabel from "pages/DevelopWorkspace/services/helpers/getRecentlyStartedQueryByLabel";
import getRunningQuery from "pages/DevelopWorkspace/services/helpers/getRunningQuery";
import isCancelError from "pages/DevelopWorkspace/services/helpers/isCancelError";
import { withRetry } from "pages/DevelopWorkspace/services/helpers/withRetry";
import {
  DocumentExecutionRuntime,
  FetchOptions,
} from "pages/DevelopWorkspace/services/types";
import {
  CancellationStatus,
  ExecutionType,
  ExplainType,
  QueryStatement,
  QueryStatementStatus,
  WorkspaceDocument,
} from "pages/DevelopWorkspace/workspace.types";

import { useCurrentAccount } from "components/Account/useCurrentAccount";
import { StatusMessageType } from "components/StatusMessageQueue/StatusMessageQueueProvider";
import useStatusMessageQueue from "components/StatusMessageQueue/hooks/useStatusMessageQueue";

import { getStatisticsFromResponse } from "./helpers/getStatisticsFromResponse";
import { useResumeQuery } from "./useResumeQuery";
import { delay, isQueryStatementRunning } from "./utils";

const useDocumentsExecution = () => {
  const {
    state: documentsState,
    actions: {
      updateDocumentQueryStatement,
      changeDocumentContext,
      updateDocumentCancellationStatus,
    },
  } = useDocuments();
  const { putStatusMessage } = useStatusMessageQueue();
  const queryClient = useQueryClient();
  const { data: engines } = useWorkspaceEngines(true);
  const { getAccount } = useCurrentAccount();
  const flags = useAllFlags();

  /**
   * Stores abort controllers and other runtime information for each document.
   * Document exists in this map only if it has pending query statement.
   * One document must have only one pending query statement.
   * If document is not present in this map, it means it's available for execution of the next query statement.
   */
  const documentsExecutionRuntime = useRef<
    Map<string, DocumentExecutionRuntime>
  >(new Map());

  const abortDocumentExecution = (documentId: string) => {
    const documentRuntime = documentsExecutionRuntime.current.get(documentId);

    if (!documentRuntime) {
      return;
    }

    documentRuntime.abortController.abort("Abort query statement execution");
    // clear runtime information when query has been aborted
    documentsExecutionRuntime.current.delete(documentId);
  };

  // Check if running query statement is still relevant (still exists in documents state)
  const checkDocumentsExecutionRelevance = () => {
    const { documents } = documentsState;

    // Check each running document
    documentsExecutionRuntime.current.forEach((documentRuntime, documentId) => {
      const id = documentRuntime.id;

      const document = documents.find(document => document.id === documentId);
      // The document ws probably removed, current fetch is not relevant
      if (!document) {
        abortDocumentExecution(documentId);
        return;
      }

      const documentExecution = document.execution;

      // document execution is not found, and current fetch is not relevant
      if (!documentExecution) {
        abortDocumentExecution(documentId);
        return;
      }

      const queryStatements = documentExecution.queryStatements;

      const queryStatement = queryStatements.find(
        queryStatement => queryStatement.id === id
      );

      // query statement is not found, and current fetch is not relevant
      if (!queryStatement) {
        abortDocumentExecution(documentId);
        return;
      }

      // query statement is cancelled
      if (queryStatement.status === QueryStatementStatus.cancelled) {
        abortDocumentExecution(documentId);
      }
    });
  };

  const clearDocumentExecutionRuntime = (
    documentId: string,
    queryStatementId: string
  ) => {
    const documentRuntime = documentsExecutionRuntime.current.get(documentId);

    if (documentRuntime && documentRuntime.id === queryStatementId) {
      // Clear runtime information. No next progress calls will be performed
      documentsExecutionRuntime.current.delete(documentId);
    }
  };

  const runDocumentCancellation = async (document: WorkspaceDocument) => {
    const runningQueryStatement = document.execution?.queryStatements.find(
      queryStatement => queryStatement.status === QueryStatementStatus.running
    );

    if (!runningQueryStatement || !engines) {
      return;
    }

    if (!runningQueryStatement.serverQueryId) {
      // no serverQueryId yet, cannot cancel
      return;
    }

    const engine = engines.find(
      engine => engine.engineName === document.context.engineName
    );

    if (!engine) {
      console.error("Selected engine does not exist. Cannot cancel query.");
      return;
    }

    updateDocumentCancellationStatus(
      document.id,
      runningQueryStatement.id,
      CancellationStatus.Pending
    );

    try {
      await cancelQuery(
        engine,
        runningQueryStatement.serverQueryId,
        { headers: {}, signal: new AbortController().signal },
        authService
      );
    } catch (error: any) {
      updateDocumentCancellationStatus(
        document.id,
        runningQueryStatement.id,
        CancellationStatus.Failure
      );
      putStatusMessage({
        message: error?.message,
        type: StatusMessageType.Error,
      });

      console.error("Error while cancelling query", error);
    }
  };

  const runDocumentExecution = async (document: WorkspaceDocument) => {
    const documentExecution = document.execution;
    const account = getAccount();

    if (!documentExecution || !account) {
      return;
    }
    const queryStatements = documentExecution.queryStatements;

    if (!queryStatements || !engines) {
      return;
    }

    // get first QueryStatementStatus.Pending query statement only if no QueryStatementStatus.Running or QueryStatementStatus.Error
    const firstPendingQueryStatement = queryStatements.find(
      queryStatement => queryStatement.status === QueryStatementStatus.pending
    );

    const firstUnknownQueryStatement = queryStatements.find(
      queryStatement => queryStatement.status === QueryStatementStatus.unknown
    );

    if (!firstPendingQueryStatement) {
      // nothing to execute. probably all query statements are finished or still running
      return;
    }

    if (firstUnknownQueryStatement) {
      // block execution until previous query status resolved
      return;
    }

    if (
      isQueryStatementRunning(
        documentsExecutionRuntime,
        document.id,
        firstPendingQueryStatement.id
      )
    ) {
      // current query statement is still running
      return;
    }

    const hasRunningQueryStatement = queryStatements.some(
      queryStatement => queryStatement.status === QueryStatementStatus.running
    );

    if (hasRunningQueryStatement) {
      // wait for other query statements to finish
      return;
    }

    const engine = engines.find(
      engine => engine.engineName === document.context.engineName
    );

    if (!engine) {
      console.error("Selected engine does not exist. Cannot execute query.");
      return;
    }

    // used to cancel fetch request (main request and progress calls)
    const abortController = new AbortController();

    // IMPORTANT: don't rely on document state here.
    // It has old documents state. Perform all checks if needed in documents reducer

    const fetchOptions: FetchOptions = {
      signal: abortController.signal,
      headers: {
        "Content-Type": "application/json",
        [FIREBOLT_PROTOCOL_VERSION_HEADER]: "2.1",
      },
    };

    executeQueryStatement(
      firstPendingQueryStatement,
      document,
      engine,
      fetchOptions,
      authService,
      flags
    )
      .then(async ({ responseBody, updateParameters }) => {
        if (updateParameters.settings) {
          // apply settings (flags) if they are present
          changeDocumentContext(document.id, {
            settings: {
              ...updateParameters.settings,
            },
          });
        }

        /*
         * Example of blocking operation:
         * 1. CREATE DATABASE new_database;
         * 2. USE DATABASE new_database; <-- it blocks the execution of the next query statement.
         * It waits for new_database to be available in context selector (in the DBs dropdown).
         * Only after that, it switches the status to success and the next query statement can be executed.
         * 3. SELECT 1;
         */
        const hasBlockingUpdateParameters =
          updateParameters.engine || updateParameters.database;

        const queryStatementUpdate: Partial<QueryStatement> = {
          statistics: getStatisticsFromResponse(responseBody),
        };

        if (!hasBlockingUpdateParameters) {
          // if there are no blocking update parameters, we can show actual result to the user
          // otherwise, we show only latest statistics. The results will be set later after engine/database change (successfully applied USE statement)
          queryStatementUpdate.status = isCancelError(responseBody)
            ? QueryStatementStatus.cancelled
            : QueryStatementStatus.success;
          queryStatementUpdate.result = {
            data: responseBody.data,
            meta: responseBody.meta,
            rows: responseBody.rows,
          };

          if (responseBody.explain_analyze) {
            queryStatementUpdate.explain = {
              [ExplainType.Analyze]: responseBody.explain_analyze,
            };
            queryStatementUpdate.explainType = ExplainType.Analyze;
          }

          if (
            document.execution?.executionType === ExecutionType.Explain ||
            firstPendingQueryStatement.executionType === ExecutionType.Explain
          ) {
            const fetchedExplains = (
              responseBody.data as { explain: string }[]
            ).reduce<Record<string, unknown>>((acc, row) => {
              try {
                const parsed = JSON.parse(row.explain);
                const { explain_type } = parsed;
                acc[explain_type] = parsed;
              } catch (error) {
                return acc;
              }
              return acc;
            }, {});
            queryStatementUpdate.explain = {
              ...(firstPendingQueryStatement.explain || {}),
              ...fetchedExplains,
            };
          }
        }

        if (queryStatementUpdate.status === QueryStatementStatus.cancelled) {
          updateDocumentCancellationStatus(
            document.id,
            firstPendingQueryStatement.id,
            CancellationStatus.Cancelled
          );
        }

        updateDocumentQueryStatement(
          document.id,
          firstPendingQueryStatement.id,
          queryStatementUpdate
        );

        clearDocumentExecutionRuntime(
          document.id,
          firstPendingQueryStatement.id
        );

        if (updateParameters.engine) {
          let attempts = 0;

          // system engine does not need to be checked (it's always available)
          if (updateParameters.engine !== "system") {
            while (
              !queryClient
                .getQueryData<
                  WorkspaceEngine[]
                >([ReactQueryKeysAccount.workspaceEngines])
                ?.find(engine => engine.engineName === updateParameters.engine)
            ) {
              if (attempts > 5) {
                throw new Error(
                  `Could not use engine. The engine '${updateParameters.engine}' does not exist or is not authorized. Try again with a different engine name.`
                );
              }

              queryClient.invalidateQueries({
                queryKey: [ReactQueryKeysAccount.workspaceEngines],
              });

              // eslint-disable-next-line no-await-in-loop
              await delay(500); // TODO maybe remove
              attempts++;
            }
          }

          changeDocumentContext(document.id, {
            engineName: updateParameters.engine,
            settings: {}, // reset all setting since the engine was changed
          });

          updateDocumentQueryStatement(
            document.id,
            firstPendingQueryStatement.id,
            {
              statistics: getStatisticsFromResponse(responseBody),
              status: QueryStatementStatus.success,
              result: {
                data: responseBody.data,
                meta: responseBody.meta,
                rows: responseBody.rows,
              },
            }
          );
        }

        if (updateParameters.database) {
          // databasesNames

          let attempts = 0;
          while (
            !queryClient
              .getQueryData<Database[]>([ReactQueryKeysAccount.databasesNames])
              ?.find(
                database => database.catalogName === updateParameters.database
              )
          ) {
            if (attempts > 5) {
              throw new Error(
                `Could not use database. The database '${updateParameters.database}' does not exist or is not authorized. Try again with a different database name.`
              );
            }
            queryClient.invalidateQueries({
              queryKey: [ReactQueryKeysAccount.databasesNames],
            });
            // eslint-disable-next-line no-await-in-loop
            await delay(500); // TODO maybe remove

            attempts++;
          }

          changeDocumentContext(document.id, {
            databaseName: updateParameters.database,
          });

          updateDocumentQueryStatement(
            document.id,
            firstPendingQueryStatement.id,
            {
              statistics: getStatisticsFromResponse(responseBody),
              status: QueryStatementStatus.success,
              result: {
                data: responseBody.data,
                meta: responseBody.meta,
                rows: responseBody.rows,
              },
            }
          );
        }
      })
      .catch(error => {
        if (isNetworkError(error)) {
          updateDocumentQueryStatement(
            document.id,
            firstPendingQueryStatement.id,
            {
              status: QueryStatementStatus.unknown,
            }
          );
        } else {
          updateDocumentQueryStatement(
            document.id,
            firstPendingQueryStatement.id,
            {
              status: QueryStatementStatus.error,
              error: error.message,
            }
          );
        }

        clearDocumentExecutionRuntime(
          document.id,
          firstPendingQueryStatement.id
        );
      });

    // switch query statement status to running
    updateDocumentQueryStatement(document.id, firstPendingQueryStatement.id, {
      status: QueryStatementStatus.running,
    });

    documentsExecutionRuntime.current.set(document.id, {
      id: firstPendingQueryStatement.id,
      abortController,
      progressFetchPaused: false,
    });

    if (engine.engineName === "system") {
      // skip progress calls
      return;
    }

    await delay(500); // this delay prevents starting query progress call before query is actually started

    // check if query is still running (relevant for fast queries less than 500ms)
    if (
      !isQueryStatementRunning(
        documentsExecutionRuntime,
        document.id,
        firstPendingQueryStatement.id
      )
    ) {
      // query is probably finished or cancelled
      // no need to start querying progress
      return;
    }

    const queryLabel = getQueryLabel(firstPendingQueryStatement, document);

    const retries = 5;
    const retryDelayMs = 1000;

    try {
      const runningQuery = await withRetry(
        getRecentlyStartedQueryByLabel,
        [engine, queryLabel, fetchOptions, authService],
        retries,
        retryDelayMs
      );

      // while main query is running
      while (
        isQueryStatementRunning(
          documentsExecutionRuntime,
          document.id,
          firstPendingQueryStatement.id
        )
      ) {
        // eslint-disable-next-line no-await-in-loop
        await delay(200); // too many network requests without delay

        if (
          documentsExecutionRuntime.current.get(document.id)
            ?.progressFetchPaused
        ) {
          // eslint-disable-next-line no-await-in-loop
          await delay(1000); // wait 1 second until next check
        } else {
          try {
            // fetching query progress
            // eslint-disable-next-line no-await-in-loop -- it's ok here
            const runningQueryByServerId = await getRunningQuery(
              engine,
              runningQuery.queryId,
              fetchOptions,
              authService
            );

            updateDocumentQueryStatement(
              document.id,
              firstPendingQueryStatement.id,
              {
                serverQueryId: runningQuery.queryId,
                statistics: {
                  executionTimeSec:
                    runningQueryByServerId.durationUsec / 1000000,
                  bytesRead: runningQueryByServerId.scannedBytes,
                  rowsRead: runningQueryByServerId.scannedRows,
                },
              }
            );
          } catch (e) {
            if (
              !isQueryStatementRunning(
                documentsExecutionRuntime,
                document.id,
                firstPendingQueryStatement.id
              )
            ) {
              // query is probably finished or cancelled, it's expected
              // don't show error
              return;
            }

            console.log(
              "Cannot get query progress. Running query is not found by query_id.",
              e
            );
            // it has been cancelled or finished
          }
        }
      }
    } catch (e) {
      if (
        !isQueryStatementRunning(
          documentsExecutionRuntime,
          document.id,
          firstPendingQueryStatement.id
        )
      ) {
        // query is probably finished or cancelled, it's expected
        // don't show error
        return;
      }

      console.log(
        "Cannot start querying progress. Running query is not found by label",
        e
      );

      Sentry.captureException(e, {
        extra: {
          engineName: engine.engineName,
          engineStatus: engine.status,
          engineLastStarted: engine.lastStarted,
          queryStatementId: firstPendingQueryStatement.id,
          queryLabel,
          retries,
          retryDelayMs,
        },
      });
    }
  };

  useEffect(() => {
    const { documents } = documentsState;

    // check inconsistency between documents state and documentsExecutionRuntime
    checkDocumentsExecutionRelevance();

    // triggers on each documents state change
    for (const document of documents) {
      // check for each document if it has to start execution or execution to be continued, or do nothing
      runDocumentExecution(document);
    }

    // eslint-disable-next-line react-hooks/exhaustive-deps -- should react on any documents state change
  }, [documentsState]);

  useResumeQuery({ documentsExecutionRuntime });

  useEffect(() => {
    const { documents } = documentsState;

    // triggers on each documents state change
    for (const document of documents) {
      if (
        document.execution?.cancellationStatus === CancellationStatus.Initiated
      ) {
        runDocumentCancellation(document);
      }
    }

    // eslint-disable-next-line react-hooks/exhaustive-deps -- should react on any documents state change
  }, [documentsState]);

  useEffect(() => {
    // set fetch paused for all running documents except the active one

    for (const [
      documentId,
      documentRuntime,
    ] of documentsExecutionRuntime.current) {
      if (documentId !== documentsState.activeDocumentId) {
        documentRuntime.progressFetchPaused = true;
      } else {
        documentRuntime.progressFetchPaused = false;
      }
    }
  }, [documentsState.activeDocumentId]);

  useEffect(() => {
    const runtime = documentsExecutionRuntime.current;

    return () => {
      // cancel all running fetch requests when component is unmounted
      runtime.forEach(documentRuntime => {
        documentRuntime.abortController.abort(
          "Abort all query statement executions"
        );
      });
    };
    // eslint-disable-next-line react-hooks/exhaustive-deps -- should run only on component unmount
  }, []);
};

export default useDocumentsExecution;
