import { JSONParser } from "@streamparser/json";
import { QUERY_OUTPUT } from "types/outputFormat";
import {
  hasOnlyComments,
  isSetStatement,
} from "utils/helpers/Query/splitSQLQuery";
import { stripComments } from "utils/helpers/Query/strip-comments/strip";

import { FeatureFlag } from "featureFlags/constants";
import { useAllFlags } from "featureFlags/hooks/useAllFlags";
import { AuthService } from "services/auth";
import { WorkspaceEngine } from "services/engines/engine.types";
import { normalizeResponse } from "services/query/helpers";

import { NONE } from "pages/DevelopWorkspace/Editor/Document/DocumentContextSelector/types";
import {
  FIREBOLT_UPDATE_ENDPOINT_HEADER,
  FIREBOLT_UPDATE_PARAMETER_HEADER,
  SYSTEM_QUERY_PARAMETERS,
} from "pages/DevelopWorkspace/services/constants";
import { buildEngineUrl } from "pages/DevelopWorkspace/services/helpers/buildEngineUrl";
import { EmptyBodyError } from "pages/DevelopWorkspace/services/helpers/executionErrors/EmptyBodyError";
import getQueryLabel from "pages/DevelopWorkspace/services/helpers/getQueryLabel";
import getSystemParamErrorMessage from "pages/DevelopWorkspace/services/helpers/getSystemParamErrorMessage";
import { FetchOptions } from "pages/DevelopWorkspace/services/types";
import {
  ExecutionType,
  ExplainType,
  QueryStatement,
  WorkspaceDocument,
} from "pages/DevelopWorkspace/workspace.types";

import { parseQuerySetting } from "components/utils/queries";

import { ServerError } from "./executionErrors/ServerError";
import { TooLargeResponseError } from "./executionErrors/TooLargeResponseError";
import { isCancelMessage } from "./isCancelError";

const MAX_RESPONSE_SIZE_MB = 100;
const maxResponseSizeBytes = MAX_RESPONSE_SIZE_MB * 1024 * 1024;

const createUpdateParameters = (
  fireboltUpdateParameters: string,
  fireboltUpdateEndpoint: string,
  sourceParams: {
    engine?: string;
    database?: string;
    settings?: {
      [key: string]: string;
    };
  }
) => {
  const params = fireboltUpdateParameters
    ? fireboltUpdateParameters.split(",")
    : [];
  const paramsObj: {
    [key: string]: string;
  } = params.reduce((acc, param) => {
    const [key, value] = param.split("=");
    return {
      ...acc,
      [key]: value,
    };
  }, {});

  const newParams = {
    ...sourceParams,
  };

  newParams.database = paramsObj.database;

  if (fireboltUpdateEndpoint) {
    const url = new URL("https://" + fireboltUpdateEndpoint);
    const searchParams = new URLSearchParams(url.search);
    const engine = searchParams.get("engine");

    if (engine) {
      newParams.engine = engine;
    }
  }

  return newParams;
};

const processSetStatement = (
  query: string
): {
  content: string;
  key: string;
  value: string;
} => {
  const newSettings = parseQuerySetting(query);

  if (newSettings?.key && newSettings?.value) {
    if (SYSTEM_QUERY_PARAMETERS.includes(newSettings.key)) {
      throw new Error(getSystemParamErrorMessage(newSettings.key));
    }

    return {
      content: `-- Validation set ${newSettings.key} = ${newSettings.value}`,
      key: newSettings.key,
      value: newSettings.value,
    };
  }

  throw new Error("Invalid SET statement");
};

const executeQueryStatement = async (
  queryStatement: QueryStatement,
  document: WorkspaceDocument,
  engine: WorkspaceEngine,
  fetchOptions: FetchOptions,
  authService: AuthService,
  flags?: ReturnType<typeof useAllFlags>
): Promise<{
  responseBody: any;
  updateParameters: {
    // list of params (flags, settings, etc.) that should be updated in document context
    engine?: string;
    database?: string;
    settings?: {
      [key: string]: string;
    };
  };
}> => {
  const token = await authService.getToken();

  if (!token) {
    throw new Error("Auth token is not available");
  }

  const settings = { ...document.context.settings };

  // list of params (flags, settings, etc.) that should be updated in document context
  const updateParameters: {
    engine?: string;
    database?: string;
    settings?: {
      [key: string]: string;
    };
  } = {};

  let contentToExecute = queryStatement.content;

  if (isSetStatement(queryStatement.content)) {
    // SET statement processing function is not supporting comments, so we need to strip them
    const { content, key, value } = processSetStatement(
      stripComments(queryStatement.content)
    );

    contentToExecute = content;
    settings[key] = value;

    // if validation query executes successfully, the flag will be added to the settings
    updateParameters.settings = {
      ...settings,
    };
  }

  const getDatabase = () => {
    if (
      !document.context.databaseName ||
      document.context.databaseName === NONE
    ) {
      return "";
    }
    return document.context.databaseName as string;
  };

  // params for current execution
  const queryParameters = {
    ...settings,
    query_label: getQueryLabel(queryStatement, document),
    database: getDatabase(),
    output_format: QUERY_OUTPUT.JSON_COMPACT_LIMITED,
    cancel_query_on_connection_drop: "type_dependent",
  };

  if (
    !isSetStatement(queryStatement.content) &&
    (document.execution?.executionType === ExecutionType.Explain ||
      queryStatement.executionType === ExecutionType.Explain)
  ) {
    const type = queryStatement.explainType ?? ExplainType.Physical;
    contentToExecute = `explain (format json, ${type}) ${contentToExecute}`;
  } else if (flags && flags[FeatureFlag.FireboltAppEnableQueryProfile]) {
    Object.assign(queryParameters, {
      append_explain_analyze_to_json_output_format: true,
    });
  }

  const engineURL = new URL(`https://${engine.url}`);
  const url = buildEngineUrl(engineURL, queryParameters);

  const { headers, abortController } = fetchOptions;
  const response = await fetch(url, {
    signal: abortController.signal,
    method: "POST",
    headers: {
      ...headers,
      Authorization: `Bearer ${token}`,
    },
    body: contentToExecute,
  });

  // extract firebolt update parameters when USE statement is used
  const fireboltUpdateParameters =
    response.headers.get(FIREBOLT_UPDATE_PARAMETER_HEADER) || "";

  const fireboltUpdateEndpoint =
    response.headers.get(FIREBOLT_UPDATE_ENDPOINT_HEADER) || "";

  const newUpdateParams =
    fireboltUpdateParameters || fireboltUpdateEndpoint
      ? createUpdateParameters(
          fireboltUpdateParameters,
          fireboltUpdateEndpoint,
          updateParameters
        )
      : updateParameters;

  if (response.ok) {
    try {
      const parser = new JSONParser();

      let json: any;

      parser.onValue = parsedElementInfo => {
        if (!parsedElementInfo.parent) {
          json = parsedElementInfo.value;
        }
      };

      if (response.body) {
        let bytesReceived = 0;

        const reader = response.body.getReader();
        let done = false;

        while (!done) {
          // eslint-disable-next-line no-await-in-loop -- we need to wait for the next chunk
          const { done: isDone, value } = await reader.read();
          done = isDone;

          if (value) {
            bytesReceived += value.length;

            if (bytesReceived > maxResponseSizeBytes) {
              fetchOptions.abortController.abort();
              throw new TooLargeResponseError(
                `Query results exceed the ${MAX_RESPONSE_SIZE_MB}MB limit. Use the LIMIT clause to reduce the result size.`
              );
            }

            parser.write(value);
          }
        }
      }

      if (json === undefined) {
        if (
          isSetStatement(queryStatement.content) ||
          hasOnlyComments(queryStatement.content)
        ) {
          throw new Error(
            "Response is empty. Should be replaced with a placeholder."
          );
        } else {
          throw new EmptyBodyError(
            "Unexpected empty response body. Query status restoration required."
          );
        }
      }

      const errorMessage = json?.errors?.[0]?.description;

      if (errorMessage) {
        throw new ServerError(errorMessage);
      }

      if (json?.errors) {
        throw new Error("Unknown error occurred.");
      }

      const normalizedResponse = normalizeResponse({
        response: json,
        querySettings: queryParameters,
        responseStatusCode: response.status,
      });

      return {
        responseBody: normalizedResponse,
        updateParameters: newUpdateParams,
      };
    } catch (e: any) {
      if (e instanceof EmptyBodyError) {
        throw e;
      }

      if (e instanceof TooLargeResponseError) {
        throw e;
      }

      if (e instanceof ServerError) {
        const isCancel = isCancelMessage(e.message);
        if (!isCancel) {
          throw new Error(e.message);
        }

        const normalizedResponse = normalizeResponse({
          response: {
            meta: [
              {
                name: "No",
                type: "",
                displayName: "No",
              },
              {
                name: "",
                type: "string",
              },
            ],
            data: [[1, "Query execution has been canceled."]],
            statistics: {},
            errors: [{ description: e.message }],
          },
          querySettings: queryParameters,
          responseStatusCode: response.status,
        });

        return {
          responseBody: normalizedResponse,
          updateParameters: newUpdateParams,
        };
      }

      // mock response if it cannot be parsed
      const normalizedResponse = normalizeResponse({
        response: {
          meta: [],
          data: [],
          statistics: {},
        },
        querySettings: queryParameters,
        responseStatusCode: response.status,
      });

      return {
        responseBody: normalizedResponse,
        updateParameters: newUpdateParams,
      };
    }
  }

  try {
    const error = await response.clone().json();
    const errorMessage = error?.errors?.[0]?.description;
    throw new ServerError(errorMessage);
  } catch (e) {
    if (e instanceof ServerError) {
      throw new Error(e.message);
    }

    const text = await response.text();
    throw new Error(text);
  }
};

export default executeQueryStatement;
