import cloneDeep from 'lodash/cloneDeep'
import debounce from 'lodash/debounce'
import AppSettings from '@app/AppSettings'
import {
  ASSET_CONNECTION_TYPES,
  CUSTOMER_ID_COLUMN,
  DATA_ASSET_TYPES,
  DEFAULT_CSV_SEPARATOR,
  DataType,
  OFFERFIT_RECEIVED_TS_COLUMN,
  REQUEST_DEBOUNCE_DELAY,
  ROLE_TYPE,
  TASK_NAME,
} from '@shared/data/constants'
import { fileExtensionsMap } from '@shared/data/mapAndLists'
import { store } from '@app/store'
import type {
  BucketFileInfo,
  ColumnMappingPayload,
  DataAssetFormat,
  DataAssetWrapper,
  IDataAsset,
  NullString,
  NullStringObject,
  PipelineUpdateContext,
  SelectItem,
  StoreDataChangePayload,
} from '@shared/utils/Types'
import Global from '@shared/utils/global'
import {
  deepToRaw,
  getDataTypeByDType,
  getFieldValueOrDefault,
  getSortedObjectByExample,
  isEmpty,
  isEqual,
  mergeDeep,
  validatePathString,
  validatePathStringDate,
} from '@shared/utils/helpers'
import { escapeTabs, slugString, unescapeTabs } from '@shared/utils/transformHelpers'
import { createReactiveDataAssetStateObject } from '@/data-assets/utils/createDataAssetStateObject.ts'
import type { BucketFileInfoType, UseAssetDataOptions } from '@/data-assets/utils/types.ts'
import type { PipelineConfigReactive } from '@/pipelines/composables/usePipelineConfig'
import type { MetadataModel } from '@/pipelines/models/server/PipelineModel'

import { getDataAssetTemplate, useAssetFieldsMetadata, usePipelineConfig } from '@/data-assets/utils/config'
import type { EngineerFeaturesReactive } from '@/usecases/composables/useEngineeringFeatures'
import { useEngineeringFeatures } from '@/usecases/composables/useEngineeringFeatures'
import { useMetadataChangeModal } from '@/data-assets/composables/useMetadataChangeModal'
import { LATEST_VERSION, metadataVersioning } from '@/data-assets/utils/version'
import type { defaultCreateFeatureWrapper } from '@/data-assets/models/FeaturesModel'
import type { AssetFieldMetadataModel, AssetFieldValidationRule, AssetMetadataModel } from '@/data-assets/models/server/AssetMetadataModel'
import type { AssetDTypes, BucketAssetMetadata } from '@/data-assets/models/server/BucketAssetMetadata'
import AssetDataService from '@/data-assets/services/AssetDataService'
import SampleDataService from '@/data-assets/services/SampleDataService'

const sampleDataFormatsOptions = [
  { value: 'csv', label: 'csv' },
  { value: 'csv.gz', label: 'csv.gz' },
  { value: 'parquet', label: 'parquet' },
  { value: 'mparticle', label: 'mparticle' },
  //  { value: 'zip', label: 'zip' }
]

const FILE_PATH_FORMAT_ERROR = 'Incorrect file path formatting. Make sure that month is represented as %m and not %M.'

const COLUMN_NONE = 'none'

type AssetDataStateType = ReturnType<typeof createReactiveDataAssetStateObject>

let globalState: AssetDataStateType | undefined

export function useAssetData(
  options: UseAssetDataOptions = {},
) {
  const {
    onAssetCreated,
    onAssetLoaded,
    autosave,
    autoload,
    defaultStateObject,
    readOnly,
  } = options

  if (!globalState) { globalState = createReactiveDataAssetStateObject() }

  const _currentState: AssetDataStateType = !defaultStateObject ? globalState : defaultStateObject || createReactiveDataAssetStateObject()

  _currentState.readOnly = readOnly || store.getters.isReadonlyMode

  const pipelineConfig: PipelineConfigReactive = usePipelineConfig(!defaultStateObject)

  const {
    addOrUpdateDataAsset,
    getPipelinePreprocessConfig,
    filteringFeatureAssetsForCurrentAsset,
    dataAssetWrappers,
    currentDataAssetWrapper,
    dropColumns,
    renameColumns,
    descriptionColumns,
    primaryKeyColumns,
    partitionColumn,
    joinedDataAssetColumns,
    currentDataAssetKey,
    immutableAsset,
    loadAllTypeProcessedDataRows,
  } = pipelineConfig

  const { getPipelineMetadata } = useAssetFieldsMetadata()

  // Computed
  const assetSlugName = computed<string | undefined>(
    () => currentDataAssetWrapper?.value?.key,
  )

  const assetSlugDisplayName = computed<string>(
    () => slugString(_currentState.assetDisplayName),
  )

  const isDataAssetFormBusy = computed<boolean>(
    () => _currentState.isDataAssetRequestPending || _currentState.isUploadingToParse || _currentState.isParsingFromBucket,
  )

  const targetFileInfo: ComputedRef<BucketFileInfoType> = computed<BucketFileInfoType>(() => {
    return _currentState.datasetFileInfo || _currentState.sampleDataFileInfo
  })

  const isFileCannotBeFound = computed<boolean>(() => {
    return _currentState.assetFilePathFormat ? _currentState.datasetFileInfo === null : _currentState.sampleDataFileInfo === null
  })

  const dataAssetColumns = computed<string[]>(() => {
    if (!haveHeadersLoaded.value) { return [OFFERFIT_RECEIVED_TS_COLUMN] }
    return [OFFERFIT_RECEIVED_TS_COLUMN, ...(_currentState.assetDataHeaders || [])]
  })

  const haveHeadersLoaded = computed<boolean>(() => {
    return !isEmpty(_currentState.assetDataHeaders)
  })

  const dataAssetColumnsOptions = computed<SelectItem[]>(() => {
    return [
      {
        label: 'None',
        value: COLUMN_NONE,
      } as SelectItem,
    ].concat(
      dataAssetColumns.value.map(
        (assetColumn: string) =>
          ({
            label: assetColumn,
            value: assetColumn,
          } as SelectItem),
      ),
    )
  })

  const featuresMappingUserIdentifierColumn = computed<NullString>(() => {
    return filteringFeatureAssetsForCurrentAsset.value?.mapping?.user_identifier_col
  })

  // -- Methods --

  const validateFields = (): boolean => {
    let errorFlag = true

    if (!_currentState.assetDisplayName) {
      _currentState.assetDisplayNameError.value = 'An asset display name is required'
      errorFlag = false
    }

    if (!_currentState.assetDescription) {
      _currentState.assetDescriptionError.value = 'An asset description is required'
      errorFlag = false
    }

    if (
      _currentState.assetWrapperOriginalCopy?.key !== slugString(_currentState.assetDisplayName)
      && _currentState.assetWrapperOriginalCopy?.asset?.display_name !== _currentState.assetDisplayName
      && dataAssetWrappers.value.map((item: DataAssetWrapper) => item.key).includes(slugString(_currentState.assetDisplayName))
    ) {
      _currentState.assetDisplayNameError.value = 'An asset name must be unique'
      errorFlag = false
    }

    if (!_currentState.assetDataFormat) {
      _currentState.assetDataFormatError.value = 'An asset data format is required'
      errorFlag = false
    }
    if (!_currentState.assetType) {
      _currentState.assetTypeError.value = '*Asset type selection is required'
      errorFlag = false
    }

    if (
      _currentState.assetFilePathFormat
      && (!validatePathString(_currentState.assetFilePathFormat) || !validatePathStringDate(_currentState.assetFilePathFormat))
    ) {
      _currentState.assetFilePathFormatError.value = FILE_PATH_FORMAT_ERROR
      errorFlag = false
    }

    if (_currentState.user_identifier_col === undefined) {
      _currentState.userIdentifierColumnError.value = 'A user identifier column is required'
      errorFlag = false
    }

    return errorFlag
  }

  const clearForm = (): void => {
    _currentState.assetDisplayName = ''
    _currentState.assetDisplayNameError.value = null
    _currentState.assetDescription = ''
    _currentState.assetDescriptionError.value = null
    _currentState.assetCpuLimit = ''
    _currentState.assetMemoryLimit = ''
    _currentState.assetCSVSeparator = DEFAULT_CSV_SEPARATOR
    _currentState.assetDataFormat = undefined
    _currentState.assetDataFormatError.value = null
    _currentState.assetPartitionColumn = undefined
    _currentState.assetRequired = false
    _currentState.assetFilePathFormat = ''
    _currentState.assetFilePathFormatInput = ''
    _currentState.assetFilePathFormatError.value = null
    _currentState.parsedFileName = null
    _currentState.assetDataHeaders = undefined
    _currentState.sampleDataFileInfo = undefined
    _currentState.datasetFileInfo = undefined
    _currentState.user_identifier_col = undefined
    _currentState.userIdentifierColumnError.value = null
    _currentState.event_time_col = OFFERFIT_RECEIVED_TS_COLUMN
    _currentState.eventTimeColumnError.value = null
    _currentState.assetWrapperOriginalCopy = undefined
    _currentState.foundDatasetFileInfo = undefined
    _currentState.assetType = undefined
    _currentState.assetTypeError.value = null
    _currentState.sampleUploadOrBucketConnection = undefined
  }

  const setAssetValues = (): void => {
    _currentState.assetWrapperOriginalCopy = structuredClone(deepToRaw(currentDataAssetWrapper.value || { asset: {}, key: '' } as DataAssetWrapper))
    _currentState.assetDisplayName = currentDataAssetWrapper.value?.asset?.display_name || ''
    _currentState.assetDescription = currentDataAssetWrapper.value?.asset?.description || ''
    _currentState.assetCpuLimit = currentDataAssetWrapper.value?.asset?.cpu_limit || ''
    _currentState.assetMemoryLimit = currentDataAssetWrapper.value?.asset?.memory_limit || ''
    _currentState.assetCSVSeparator = unescapeTabs(currentDataAssetWrapper.value?.asset?.separator) || DEFAULT_CSV_SEPARATOR
    _currentState.assetDataFormat = (currentDataAssetWrapper.value?.asset?.data_format as DataAssetFormat) || undefined
    _currentState.assetPartitionColumn = currentDataAssetWrapper.value?.asset?.partition_col
    _currentState.assetRequired = !currentDataAssetWrapper.value?.asset?.optional || false
    _currentState.assetFilePathFormatInput = _currentState.assetFilePathFormat
      = currentDataAssetWrapper.value?.asset?.file_name_format || ''
    _currentState.assetType = currentDataAssetWrapper?.value?.asset?.asset_type
    _currentState.sampleUploadOrBucketConnection = currentDataAssetWrapper.value?.asset?.file_name_format
      ? ASSET_CONNECTION_TYPES.BUCKET
      : ASSET_CONNECTION_TYPES.SAMPLE
  }

  const setColumnDependedValues = (): void => {
    if (getCustomerIdentifierOriginalColumnName.value === null) {
      _currentState.user_identifier_col = COLUMN_NONE
    }
    else {
      _currentState.user_identifier_col = customerIdentifierColNameDoesExistInAsset.value
        ? getCustomerIdentifierOriginalColumnName.value
        : undefined
    }
    _currentState.event_time_col = getEventTimeCol() || OFFERFIT_RECEIVED_TS_COLUMN
  }

  const uploadToParseProgressHandler = (progressEvent: any): void => {
    const total = progressEvent.total || 100
    const loaded = progressEvent.loaded || 1
    _currentState.uploadingToParseProgress = Math.round(loaded / total)
  }

  const uploadToBucketProgressHandler = (progressEvent: any): void => {
    const total = progressEvent.total || 100
    const loaded = progressEvent.loaded || 1
    _currentState.uploadingToBucketProgress = Math.round(loaded / total)
  }

  const createAssetInFeatureConfig = async (key: string) => {
    const payload = { [key]: getNewAssetFeatureConfigObject() }
    await store.dispatch('pipeline/saveNewAssetFeaturesConfig', payload as defaultCreateFeatureWrapper)
  }

  const addNewAsset = async (): Promise<boolean> => {
    if (validateFields()) {
      _currentState.isDataAssetRequestPending = true

      if (_currentState.files.length) {
        let success = true
        await SampleDataService.uploadSampleDataFileToBucket(
          _currentState.files[0],
          assetSlugDisplayName.value,
          uploadToBucketProgressHandler,
        ).catch((reject) => {
          Global.error(`Failed to upload sample file to GCS. ${reject.toString()}`)
          success = false
        })
        if (!success) {
          _currentState.isDataAssetRequestPending = false
          return false
        }
      }

      const assetData: IDataAsset = mergeDeep(getDataAssetTemplate(), {
        separator: escapeTabs(_currentState.assetCSVSeparator) || DEFAULT_CSV_SEPARATOR,
        display_name: _currentState.assetDisplayName,
        optional: !_currentState.assetRequired,
        description: _currentState.assetDescription,
        file_name_format: _currentState.assetFilePathFormat,
        cpu_limit: _currentState.assetCpuLimit,
        memory_limit: _currentState.assetMemoryLimit,
        partition_col: _currentState.assetPartitionColumn,
        data_format: _currentState.assetDataFormat,
        extension: _currentState.assetDataFormat && fileExtensionsMap.get(_currentState.assetDataFormat),
        drop_duplicated_rows: true,
        enforce_pk: true,
        standardize_fields_after_preprocess: true,
        asset_type: _currentState.assetType,
        pk_cols: _currentState.assetType === DATA_ASSET_TYPES.SNAPSHOT ? [OFFERFIT_RECEIVED_TS_COLUMN] : [],
      })

      const { newMetadata } = await generateFieldsMetadata(
        assetSlugDisplayName.value,
        dataAssetColumns.value,
        getDataAssetFieldTypes(),
      )

      await createOrSaveDataAssetConfig(assetData, newMetadata)

      _currentState.isDataAssetRequestPending = false

      if (onAssetCreated) {
        Promise.resolve(onAssetCreated(assetData))
          .then(() => {})
          .catch((error: any) => {
            console.error(error.message)
          })
      }

      return true
    }

    return false
  }

  const createOrSaveDataAssetConfig = async (assetData: IDataAsset, newMetadata?: MetadataModel) => {
    const assetName = assetSlugName.value ?? assetSlugDisplayName.value

    // put the changed config to the store and refresh the timestamp
    addOrUpdateDataAsset({ key: assetName, asset: assetData } as DataAssetWrapper)

    // update the metadata
    if (newMetadata) {
      store.commit('pipeline/assetsMetadata', newMetadata)
    }

    // create selected asset in features config if it is not there
    if (!filteringFeatureAssetsForCurrentAsset.value) {
      await createAssetInFeatureConfig(assetName)
    }

    await addEventColumnToMapping(_currentState.event_time_col)
    await addUserIdentifierToMapping(_currentState.user_identifier_col)

    // send the updated config to the API
    await store.dispatch('pipeline/updatePipeline', {
      assetName,
      clientId: store.getters['client/client'].id,
      pipelineName: store.getters['pipeline/pipeline']?.name,
      updateExpectations: !!newMetadata,
    } as PipelineUpdateContext)

    // make a new copy of the saved asset
    if (!_currentState.assetWrapperOriginalCopy) {
      _currentState.assetWrapperOriginalCopy = { key: assetName, asset: undefined }
    }
    _currentState.assetWrapperOriginalCopy.asset = structuredClone(deepToRaw(assetData))
  }

  const updateAsset = async (): Promise<boolean> => {
    if (validateFields() || autosave) {
      if (_currentState.files.length && assetSlugName.value) {
        _currentState.isDataAssetRequestPending = true
        await SampleDataService.uploadSampleDataFileToBucket(
          _currentState.files[0],
          assetSlugName.value,
          uploadToBucketProgressHandler,
        )
        _currentState.isDataAssetRequestPending = false
      }

      const assetData: IDataAsset = mergeDeep(
        getDataAssetTemplate(),
        mergeDeep(_currentState.assetWrapperOriginalCopy?.asset, {
          separator: escapeTabs(_currentState.assetCSVSeparator) || DEFAULT_CSV_SEPARATOR,
          display_name: _currentState.assetDisplayName,
          optional: !_currentState.assetRequired,
          description: _currentState.assetDescription,
          file_name_format: _currentState.assetFilePathFormat,
          cpu_limit: _currentState.assetCpuLimit,
          memory_limit: _currentState.assetMemoryLimit,
          asset_type: _currentState.assetType,
          partition_col: _currentState.assetPartitionColumn,
          data_format: _currentState.assetDataFormat,
          extension: _currentState.assetDataFormat && fileExtensionsMap.get(_currentState.assetDataFormat),
          drop_duplicated_rows: getFieldValueOrDefault(_currentState.assetWrapperOriginalCopy?.asset?.drop_duplicated_rows, true),
          enforce_pk: getFieldValueOrDefault(_currentState.assetWrapperOriginalCopy?.asset?.enforce_pk, true),
          standardize_fields_after_preprocess: getFieldValueOrDefault(
            _currentState.assetWrapperOriginalCopy?.asset?.standardize_fields_after_preprocess,
            true,
          ),
        }),
      )

      if (dropColumns.value) {
        assetData.drop_cols = dropColumns.value
      }
      if (descriptionColumns.value) {
        assetData.column_descriptions = descriptionColumns.value
      }
      if (primaryKeyColumns.value) {
        assetData.pk_cols = primaryKeyColumns.value
      }
      if (renameColumns.value) {
        assetData.rename_cols = renameColumns.value
      }

      _currentState.isDataAssetRequestPending = true
      await createOrSaveDataAssetConfig(assetData)
      _currentState.isDataAssetRequestPending = false

      return true
    }

    return false
  }

  const getSampleDataFileInfoFromBucket = async () => {
    if (AppSettings.BUCKET_QUERIES_ENABLED) {
      if (!assetSlugName.value) { return }

      _currentState.isDataAssetRequestPending = true
      const answer: 'None' | BucketFileInfo = await SampleDataService.getSampleDataFileInfoFromBucket(assetSlugName.value)
      _currentState.sampleDataFileInfo = answer === 'None' ? null : answer
      if (_currentState.sampleDataFileInfo) {
        _currentState.assetDataFormat = _currentState.sampleDataFileInfo.dataFormat
      }
      _currentState.isDataAssetRequestPending = false
    }
    else {
      _currentState.sampleDataFileInfo = null
      Global.message.warning('Feature disabled: Bucket queries')
    }
  }

  const getDatasetFileInfoFromBucket = async (
    filePathFormat: string,
    withoutBlocking: boolean = false,
  ): Promise<BucketFileInfoType> => {
    if (AppSettings.BUCKET_QUERIES_ENABLED) {
      if (!filePathFormat) { return }
      _currentState.isDataAssetRequestPending = true
      const answer: 'None' | BucketFileInfo = await AssetDataService.getDataFileInfoFromBucket(filePathFormat, withoutBlocking)
      _currentState.isDataAssetRequestPending = false

      return answer === 'None' ? null : answer
    }
    else {
      Global.message.warning('Feature disabled: Bucket queries')
      return null
    }
  }

  const rawDataAssetRowsFromBQ = ref<Record<string, string>[] | undefined>()

  const loadSourceSampleDataRows = async (limit: number = 100) => {
    let sampleRowsFromFile: Record<string, string>[] | null = null

    if (rawDataAssetRowsFromBQ.value?.length) {
      return rawDataAssetRowsFromBQ.value
    }

    if (AppSettings.BUCKET_QUERIES_ENABLED) {
      if (!targetFileInfo.value) {
        return []
      }
      _currentState.isParsingFromBucket = true
      try {
        sampleRowsFromFile = await AssetDataService.getDataRowsFromBucketFile(
          immutableAsset.value?.data_format as DataAssetFormat,
          targetFileInfo.value?.name,
          {
            separator:
              currentDataAssetWrapper.value?.asset?.separator
              || getPipelinePreprocessConfig.value?.default?.separator
              || DEFAULT_CSV_SEPARATOR,
          },
          undefined,
          limit || 100,
        )
        rawDataAssetRowsFromBQ.value = sampleRowsFromFile
      }
      catch (error: any) {
        Global.error(error.message || 'getDataRowsFromBucketSampleDataFile: Sample data parsing error, please check data format.')
        sampleRowsFromFile = []
      }
      _currentState.isParsingFromBucket = false
    }

    return sampleRowsFromFile
  }

  const getSampleDataForColumn = async (columnName: string, limit: number = 15) => {
    let result: Record<string, string>[] = []

    if (AppSettings.BUCKET_QUERIES_ENABLED) {
      if (!targetFileInfo.value) {
        result = []
        return
      }

      try {
        result = await AssetDataService.getDataRowsFromBucketFile(
          immutableAsset.value?.data_format as DataAssetFormat, // dataFormat
          targetFileInfo.value?.name, // fileName
          {
            separator:
              currentDataAssetWrapper.value?.asset?.separator
              || getPipelinePreprocessConfig.value?.default?.separator
              || DEFAULT_CSV_SEPARATOR,
          }, // config
          [columnName], // columns
          limit, // limit
        )
      }
      catch (error: any) {
        Global.error(error.message || 'getDataRowsFromBucketSampleDataFile: Sample data parsing error, please check data format.')
        result = []
      }
    }

    return result.map(dataObject => dataObject[columnName])
  }

  const updateAssetDataFileInfo = async (withoutBlocking: boolean = false): Promise<void> => {
    const assetFilePathFormat = currentDataAssetWrapper.value?.asset?.file_name_format || ''

    if (assetFilePathFormat) {
      _currentState.datasetFileInfo = await getDatasetFileInfoFromBucket(assetFilePathFormat, withoutBlocking)

      if (_currentState.datasetFileInfo) {
        _currentState.assetDataFormat = _currentState.datasetFileInfo.dataFormat
      }
      else {
        _currentState.assetDataHeaders = null
      }
    }
    else if (assetSlugName.value) {
      await getSampleDataFileInfoFromBucket()
    }
  }

  const getAssetDataFields = async (withoutBlocking: boolean = false): Promise<void> => {
    const dataFileName = _currentState.datasetFileInfo?.name || _currentState.sampleDataFileInfo?.name

    if (_currentState.assetDataFormat && dataFileName) {
      _currentState.assetDataFormatError.value = null
      _currentState.isParsingFromBucket = true

      const currentAssetName = assetSlugName.value
      try {
        const metaData: BucketAssetMetadata | void = await AssetDataService.getMetadataForBucketFile(
          _currentState.assetDataFormat,
          dataFileName,
          { separator: escapeTabs(_currentState.assetCSVSeparator) || DEFAULT_CSV_SEPARATOR },
          withoutBlocking,
          getPipelineMetadata.value?.assets?.[assetSlugName.value || '']?.columns?.length > 1 || false, // if true will use fewer rows for dtypes scanning
          assetSlugName.value,
        )

        if (metaData && currentAssetName === assetSlugName.value) {
          _currentState.assetDataHeadersdTypes = metaData.dtypes
          _currentState.assetDataHeaders = metaData.headers
        }
      }
      catch (error: any) {
        _currentState.assetDataHeaders = null
        _currentState.assetDataFormatError.value = 'Asset data parsing error, please check whether the latest asset file has correct data format.'
      }

      _currentState.isParsingFromBucket = false
    }
  }

  const getDataAssetFieldTypes: () => AssetDTypes | undefined = () => {
    return _currentState.assetDataHeadersdTypes
  }

  const debouncedPartitionColumnDependedFieldChange = debounce(async () => {
    await assetFieldPartitionColumnDependedChangeHandler()
  }, REQUEST_DEBOUNCE_DELAY)

  const debouncedFieldChange = debounce(async () => {
    if (currentDataAssetWrapper?.value?.key) {
      await updateAsset()
    }
  }, REQUEST_DEBOUNCE_DELAY)

  // -- User interaction handlers --

  const onAssetFieldValueChange = async (
    errorStringRef?: UnwrapNestedRefs<NullStringObject> | undefined,
    updatePartitionColumns: boolean = false,
  ) => {
    if (errorStringRef) {
      errorStringRef.value = null
    }

    await debouncedFieldChange()

    if (updatePartitionColumns) {
      await debouncedPartitionColumnDependedFieldChange()
    }
  }

  const wantToUseClickHandler = async () => {
    _currentState.datasetFileInfo = _currentState.foundDatasetFileInfo
    _currentState.assetDataFormat = _currentState.datasetFileInfo?.dataFormat
    _currentState.assetFilePathFormat = _currentState.assetFilePathFormatInput
    _currentState.foundDatasetFileInfo = undefined
    await getAssetDataFields()
    if (autosave) {
      await updateAsset()
    }
  }

  const assetFilePathFormatChangeHandler = () => {
    if (!_currentState.assetFilePathFormatInput) {
      _currentState.foundDatasetFileInfo = undefined
      _currentState.assetFilePathFormat = _currentState.assetFilePathFormatInput
      onAssetFieldValueChange(_currentState.assetFilePathFormatError, true)
    }
    else {
      _currentState.assetFilePathFormatError.value = null
      if (
        !validatePathString(_currentState.assetFilePathFormatInput)
        || !validatePathStringDate(_currentState.assetFilePathFormatInput)
      ) {
        _currentState.assetFilePathFormatError.value = FILE_PATH_FORMAT_ERROR
      }
      else {
        debouncedFileSeeking()
      }
    }
  }

  const debouncedFileSeeking = debounce(async () => {
    if (!_currentState.assetFilePathFormatInput) { return }
    _currentState.foundDatasetFileInfo = await getDatasetFileInfoFromBucket(_currentState.assetFilePathFormatInput)
    if (targetFileInfo.value?.name === _currentState.foundDatasetFileInfo?.name) {
      _currentState.assetFilePathFormat = _currentState.assetFilePathFormatInput
      onAssetFieldValueChange()
    }
  }, REQUEST_DEBOUNCE_DELAY)

  const inputFileChangeHandler = async (event: any): Promise<void> => {
    _currentState.files = Array.from(event.target.files || event.dataTransfer.files) || []
    if (!_currentState.files.length) { return }

    _currentState.isUploadingToParse = true

    const allPromises = [
      SampleDataService.uploadSampleDataFileToParse(
        _currentState.files[0],
        _currentState.assetDataFormat || 'csv',
        {
          separator:
            currentDataAssetWrapper.value?.asset?.separator
            || getPipelinePreprocessConfig.value?.default?.separator
            || DEFAULT_CSV_SEPARATOR,
        },
        uploadToParseProgressHandler,
      ),
    ]

    if (autosave && _currentState.files.length) {
      allPromises.push(
        SampleDataService.uploadSampleDataFileToBucket(_currentState.files[0], assetSlugName.value, uploadToBucketProgressHandler),
      )
    }

    try {
      const [headersDataResult] = await Promise.allSettled(allPromises)
      if (headersDataResult.status === 'rejected') { throw headersDataResult.reason }

      const { headers, dataFormat, name } = headersDataResult.value
      _currentState.assetDataHeaders = headers
      _currentState.assetDataFormat = dataFormat
      _currentState.parsedFileName = name
    }
    catch (error: any) {
      _currentState.assetDataFormatError.value = error.message || 'Sample data parsing error, please check data format.'
    }

    _currentState.isUploadingToParse = false
  }

  const dropOverUploaderAreaHandler = async (event: any): Promise<void> => {
    _currentState.assetDataFormatError.value = null
    await inputFileChangeHandler(event)
    _currentState.isOverUploadingArea = false
  }

  const assetFieldPartitionColumnDependedChangeHandler = async (): Promise<void> => {
    if (!_currentState.assetFilePathFormat) {
      _currentState.datasetFileInfo = undefined
    }

    if (!_currentState.assetDisplayName) {
      _currentState.sampleDataFileInfo = undefined
    }

    await updateAssetDataFileInfo()
    if (!_currentState.assetCSVSeparator) {
      _currentState.assetCSVSeparator = DEFAULT_CSV_SEPARATOR
    }
    await getAssetDataFields()
    validateFields()
  }

  const assetDataFormatFieldChangeHandler = async () => {
    await onAssetFieldValueChange(_currentState.assetDataFormatError)
    await getAssetDataFields()
    validateFields()
  }

  // id columns
  const engineerConfig: EngineerFeaturesReactive = useEngineeringFeatures(!defaultStateObject, pipelineConfig.currentDataAssetKey)
  const { getNewAssetFeatureConfigObject } = engineerConfig

  const userIdentifierSelectEmitHandler = async () => {
    onAssetFieldValueChange(_currentState.userIdentifierColumnError)

    if (_currentState.user_identifier_col) {
      await addUserIdentifierToMapping(_currentState.user_identifier_col)
    }
  }

  const createItemInAssetPreprocessConfig = (key: keyof IDataAsset) => {
    const payload: StoreDataChangePayload = {
      fieldObject: currentDataAssetWrapper.value?.asset,
      key,
      value: {},
    }
    store.commit('updateObjectInStore', payload)
  }

  const renameCustomerIdInPreprocessing = (): boolean => {
    let configDataChanged: boolean = false

    // if there is no customer identifier value, let's clear renaming for customer_id columns
    if (!featuresMappingUserIdentifierColumn.value && renameColumns.value) {
      const customerIdRenameField = Object.keys(renameColumns.value).filter(
        (column: string) => renameColumns.value?.[column] === CUSTOMER_ID_COLUMN,
      )

      if (customerIdRenameField.length) {
        customerIdRenameField.forEach((column: string) => {
          store.commit('removeObjectInStore', {
            fieldObject: renameColumns.value,
            key: column,
          } as StoreDataChangePayload)
        })

        configDataChanged = true

        return configDataChanged
      }
    }

    // if data asset has customer_id originally, just make sure that in 'rename_cols' we don't have 'customer_id' as target name
    if (
      renameColumns.value
      && Object.values(renameColumns.value).includes(CUSTOMER_ID_COLUMN)
      && featuresMappingUserIdentifierColumn.value === CUSTOMER_ID_COLUMN
      && haveHeadersLoaded.value
      && _currentState.assetDataHeaders?.includes(CUSTOMER_ID_COLUMN)
    ) {
      const colname = Object.keys(renameColumns.value).find((key: string) => renameColumns.value?.[key] === CUSTOMER_ID_COLUMN)
      store.commit('removeObjectInStore', {
        fieldObject: renameColumns.value,
        key: colname,
      } as StoreDataChangePayload)

      configDataChanged = true

      return configDataChanged
    }

    // need to create rename_cols if it does not exist
    if (currentDataAssetWrapper.value && !renameColumns.value) {
      createItemInAssetPreprocessConfig('rename_cols')
      configDataChanged = true
    }

    if (renameColumns.value) {
      // eventually we need `customer_id` as user identifier in features mapping
      if (featuresMappingUserIdentifierColumn.value && featuresMappingUserIdentifierColumn.value !== CUSTOMER_ID_COLUMN) {
        // need to update renaming
        const alreadyNamed = Object.keys(renameColumns.value).filter(
          (key: string) => renameColumns.value?.[key] === CUSTOMER_ID_COLUMN && key !== featuresMappingUserIdentifierColumn.value,
        )

        for (let i = 0; i < alreadyNamed.length; i++) {
          const alreadyNamedElement = alreadyNamed[i]
          store.commit('removeObjectInStore', {
            fieldObject: renameColumns.value,
            key: alreadyNamedElement,
          } as StoreDataChangePayload)

          configDataChanged = true
        }

        // if customer identifier column has not been renamed to customer_id, rename it
        if (renameColumns.value?.[featuresMappingUserIdentifierColumn.value] !== CUSTOMER_ID_COLUMN) {
          store.commit('updateObjectInStore', {
            fieldObject: renameColumns.value,
            key: featuresMappingUserIdentifierColumn.value,
            value: CUSTOMER_ID_COLUMN,
          } as StoreDataChangePayload)

          configDataChanged = true
        }
      }
    }

    if (configDataChanged) {
      // update metadata to reflect that rename cols (preprocessing config) was changed
      store.commit('pipeline/addTaskRunMetaData', {
        asset: currentDataAssetWrapper?.value?.key,
        payload: {
          key: `${TASK_NAME.PREPROCESS}_last_modified`,
          value: new Date().toUTCString(),
        } as StoreDataChangePayload,
      })
    }

    return configDataChanged
  }

  const updateFeatureConfigWithRenamedColumnMapping = async () => {
    let changedFlag = false

    const renameColumnValues = Object.values(renameColumns.value || {}) || []
    const renameColumnKeys = Object.keys(renameColumns.value || {}) || []

    const customerIdColumn = filteringFeatureAssetsForCurrentAsset.value?.mapping?.user_identifier_col || ''

    if (customerIdColumn !== CUSTOMER_ID_COLUMN && renameColumnValues.includes(CUSTOMER_ID_COLUMN)) {
      await addUserIdentifierToMapping(CUSTOMER_ID_COLUMN)
      changedFlag = true
    }

    let eventTimeColumn: NullString = filteringFeatureAssetsForCurrentAsset.value?.mapping?.event_time_col || ''

    if (eventTimeColumn && renameColumnKeys.includes(eventTimeColumn)) {
      // this column has been renamed, use new name in Feature Config
      eventTimeColumn = renameColumns.value?.[eventTimeColumn] || undefined
    }

    if (eventTimeColumn && filteringFeatureAssetsForCurrentAsset.value?.mapping?.event_time_col !== eventTimeColumn) {
      await addEventColumnToMapping(eventTimeColumn)
      changedFlag = true
    }

    return changedFlag
  }

  const getCustomerIdentifierOriginalColumnName: ComputedRef<NullString> = computed<NullString>(() => {
    if (featuresMappingUserIdentifierColumn.value === null) { return null }

    if (currentDataAssetWrapper?.value) {
      if (
        featuresMappingUserIdentifierColumn.value
        && featuresMappingUserIdentifierColumn.value === CUSTOMER_ID_COLUMN
        && renameColumns.value
        && Object.values(renameColumns.value || []).includes(CUSTOMER_ID_COLUMN)
      ) {
        // has been renamed, need to return before rename key
        return Object.keys(renameColumns.value).find((key: string) => renameColumns.value?.[key] === CUSTOMER_ID_COLUMN)
      }

      if (
        haveHeadersLoaded.value
        && _currentState.assetDataHeaders?.includes(CUSTOMER_ID_COLUMN)
        && featuresMappingUserIdentifierColumn.value === CUSTOMER_ID_COLUMN
      ) {
        return CUSTOMER_ID_COLUMN
      }

      if (featuresMappingUserIdentifierColumn.value && featuresMappingUserIdentifierColumn.value !== CUSTOMER_ID_COLUMN) {
        return featuresMappingUserIdentifierColumn.value
      }
    }

    return undefined
  })

  const customerIdentifierColNameDoesExistInAsset = computed<boolean>(() => {
    return (
      !haveHeadersLoaded.value
      || (dataAssetColumns.value
        && dataAssetColumns.value.find((columnName: string) => columnName === getCustomerIdentifierOriginalColumnName.value)
        !== undefined)
    )
  })

  const eventTimeColNameDoesExistInAsset: ComputedRef<boolean> = computed<boolean>(() => {
    if (dataAssetColumns.value) {
      const eventTimeColInConfig: NullString = filteringFeatureAssetsForCurrentAsset.value?.mapping?.event_time_col
      const renameColumnValues = Object.values(renameColumns.value || {}) || []
      const renameColumnKeys = Object.keys(renameColumns.value || {}) || []

      let columnName = eventTimeColInConfig

      if (eventTimeColInConfig && renameColumnValues.includes(eventTimeColInConfig)) {
        // has been renamed, need to return before rename key
        columnName = renameColumnKeys.find((key: string) => renameColumns.value?.[key] === eventTimeColInConfig) as string
      }

      return dataAssetColumns.value.includes(columnName || '')
    }

    return false
  })

  const getEventTimeColName = computed<NullString>(() => {
    if (dataAssetColumns.value) {
      const eventTimeColInConfig: NullString = filteringFeatureAssetsForCurrentAsset.value?.mapping?.event_time_col

      // event time col/override_date_col expects the column to be the name *after* rename
      // if the current value is pre-rename, this renames it
      if (Object.keys(renameColumns.value || []).includes(eventTimeColInConfig)) {
        return renameColumns.value[eventTimeColInConfig]
      }

      // otherwise return original
      return eventTimeColInConfig
    }

    return undefined
  })

  const addUserIdentifierToMapping = async (value: NullString) => {
    // don't update if we know we'd update to something invalid
    if (!value) { return }
    const payload: ColumnMappingPayload = {
      map_value: value !== COLUMN_NONE ? value : null,
      col_value: 'user_identifier_col',
      asset: assetSlugName.value ?? assetSlugDisplayName.value,
    }
    await store.dispatch('pipeline/updateColumnMappingInConfig', payload)
  }

  const addEventColumnToMapping = async (value: NullString) => {
    const payload: ColumnMappingPayload = {
      map_value: value !== COLUMN_NONE ? value : null,
      col_value: 'event_time_col',
      asset: assetSlugName.value ?? assetSlugDisplayName.value,
    }
    await store.dispatch('pipeline/updateColumnMappingInConfig', payload)
  }

  const getEventTimeCol = () => {
    return eventTimeColNameDoesExistInAsset.value ? getEventTimeColName.value : undefined
  }

  const generateFieldsMetadata = async (
    assetKey: string,
    fields: string[],
    dtypes?: AssetDTypes,
  ): Promise<{ oldMetadata: MetadataModel, newMetadata: MetadataModel }> => {
    const newMetadata: MetadataModel = { assets: {}, version: LATEST_VERSION }
    const oldMetadata = toRaw(getPipelineMetadata.value)
    let oldMetadataCopy = cloneDeep(oldMetadata)

    // if there is no metadata, we should make a empty template
    if (!oldMetadataCopy?.assets?.[assetKey]) {
      if (oldMetadataCopy === undefined) {
        oldMetadataCopy = {
          assets: {},
          version: LATEST_VERSION,
        }
      }
      if (oldMetadataCopy.version === undefined) {
        oldMetadataCopy.version = LATEST_VERSION
      }
      if (oldMetadataCopy.assets === undefined) {
        oldMetadataCopy.assets = {}
      }
      oldMetadataCopy.assets[assetKey] = {
        updatedAt: new Date().toUTCString(),
        updates: {},
        table: {
          minRows: undefined,
          maxRows: undefined,
          compoundUniqueField: undefined,
        },
        columns: [],
      } as AssetMetadataModel

      store.commit('pipeline/assetsMetadata', oldMetadataCopy)
      oldMetadataCopy = toRaw(getPipelineMetadata.value)
    }

    await metadataVersioning.toLatest(oldMetadataCopy)

    const keys = Object.keys(oldMetadataCopy.assets)
    for (let i = 0; i < keys.length; i++) {
      const key = keys[i]
      newMetadata.assets[key] = oldMetadataCopy.assets[key]
    }

    const assetColumns = newMetadata.assets[assetKey]?.columns || []
    const columns: AssetFieldMetadataModel[] = fields?.length ? [] : assetColumns

    for (let i = 0; i < fields.length; i++) {
      const field = fields[i]

      const validation: AssetFieldValidationRule = {
        type: dtypes ? getDataTypeByDType(dtypes[field]) : DataType.STRING,
        isUnique: {
          enabled: false,
          criticalThreshold: null,
          warningThreshold: null,
        },
        isNotNull: {
          enabled: false,
          criticalThreshold: null,
          warningThreshold: null,
        },
        typeValidation: {
          criticalThreshold: null,
          warningThreshold: null,
        },
      }

      const assetColumn = assetColumns.find((item: AssetFieldMetadataModel) => item.name === field)
      let fieldMetadataObject

      if (field === OFFERFIT_RECEIVED_TS_COLUMN) {
        fieldMetadataObject = {
          name: OFFERFIT_RECEIVED_TS_COLUMN,
          excludeFromValidationTable: true,
          ...assetColumn,
        } as AssetFieldMetadataModel
      }
      else {
        fieldMetadataObject = {
          name: field,
          ...assetColumn,
          validation: {
            type: assetColumn?.validation?.type || validation.type,
            isUnique: assetColumn?.validation?.isUnique || validation.isUnique,
            isNotNull: assetColumn?.validation?.isNotNull || validation.isNotNull,
            typeValidation: assetColumn?.validation?.typeValidation || validation.typeValidation,
          } as AssetFieldValidationRule,
        } as AssetFieldMetadataModel
      }

      columns.push(fieldMetadataObject)
    }

    if (!isEqual(assetColumns, columns)) {
      newMetadata.assets[assetKey].columns = columns
    }

    if (oldMetadataCopy?.assets?.[assetKey] && _currentState.user_identifier_col) {
      const customerIdColMetaData = oldMetadataCopy?.assets?.[assetKey]?.columns?.find((item: any) => item.name === _currentState.user_identifier_col)
      if (customerIdColMetaData?.validation?.type) {
        customerIdColMetaData.validation.type = DataType.STRING
      }
    }

    // include non-expected fields from old metadata
    const newMetadataColumns = Object.keys(newMetadata || {})
    const oldMetadataColumns = Object.keys(oldMetadata || {})
    const nonExpectedFields = oldMetadataColumns.filter((item: string) => !newMetadataColumns.includes(item))
    nonExpectedFields.forEach((item: string) => {
      // @ts-expect-error
      newMetadata[item] = oldMetadata[item]
    })

    return { oldMetadata, newMetadata }
  }

  const runColumnMappingConfigProcessing = async () => {
    const configChangedAfterRenaming = renameCustomerIdInPreprocessing()
    const configChangedAfterColumnMapping = await updateFeatureConfigWithRenamedColumnMapping()

    return { configChangedAfterRenaming, configChangedAfterColumnMapping }
  }

  const initDataAsset = async (withoutBlocking: boolean = false, withProcessedHeadersLoading: boolean = false) => {
    if (!withoutBlocking && _currentState.isDataAssetRequestPending) {
      AssetDataService.cancelDataAssetLoading()
    }

    _currentState.isDataAssetRequestPending = true

    const parallelThreads = []

    if (withProcessedHeadersLoading) {
      parallelThreads.push(loadAllTypeProcessedDataRows())
    }

    // we need the watcher stopper to have only one instance of watcher at the time
    if (_currentState.assetHeadersWatcher === null) {
      _currentState.assetHeadersWatcher = watch(
        () => _currentState.assetDataHeaders,
        async () => {
          if (currentDataAssetWrapper.value?.key && haveHeadersLoaded.value) {
            _currentState.userIdentifierColumnError.value = null
            _currentState.eventTimeColumnError.value = null

            await runColumnMappingConfigProcessing()

            setColumnDependedValues()
            validateFields()

            store.commit('pipeline/prepareAssetFields', currentDataAssetWrapper.value)

            const { oldMetadata, newMetadata } = await generateFieldsMetadata(
              currentDataAssetWrapper.value?.key,
              dataAssetColumns.value,
              getDataAssetFieldTypes(),
            )
            const metadataChanged = !isEqual(oldMetadata.assets, newMetadata.assets)

            if (metadataChanged && !_currentState.readOnly) {
              // modal flow
              useMetadataChangeModal(oldMetadata, newMetadata, async () => {
                _currentState.isDataAssetRequestPending = true
                store.commit('pipeline/assetsMetadata', newMetadata)
                store.dispatch('pipeline/updatePipeline', {
                  assetName: assetSlugName.value,
                  pipelineName: store.getters['pipeline/pipeline']?.name,
                  clientId: store.getters['client/client'].id,
                  updateExpectations: true,
                } as PipelineUpdateContext)
                _currentState.isDataAssetRequestPending = false
              })
            }
          }
        },
      )
      watch(featuresMappingUserIdentifierColumn, runColumnMappingConfigProcessing)
      watch(joinedDataAssetColumns, runColumnMappingConfigProcessing)
      watch(getPipelinePreprocessConfig, async () => {
        if (currentDataAssetWrapper.value?.key && haveHeadersLoaded.value) {
          store.commit('pipeline/prepareAssetFields', currentDataAssetWrapper.value)
        }
      })

      onBeforeUnmount(() => {
        if (typeof _currentState.assetHeadersWatcher === 'function') {
          _currentState.assetHeadersWatcher()
        }
        _currentState.assetHeadersWatcher = null
      })
    }

    const prepareDataAsset = async () => {
      clearForm()
      await updateAssetDataFileInfo(withoutBlocking)
      setAssetValues()
      await getAssetDataFields(withoutBlocking)
      setColumnDependedValues()
      validateFields()
    }

    parallelThreads.push(prepareDataAsset())

    await Promise.allSettled(parallelThreads)

    if (onAssetLoaded) {
      Promise.resolve(onAssetLoaded(currentDataAssetWrapper.value?.asset))
        .then(() => {})
        .catch((error: any) => {
          console.error(error.message)
        })
    }

    if (currentDataAssetWrapper.value && _currentState.assetWrapperOriginalCopy) {
      const rawCopy = toRaw(_currentState.assetWrapperOriginalCopy.asset)
      const sortedAssetObject = getSortedObjectByExample(cloneDeep(toRaw(currentDataAssetWrapper.value.asset)), rawCopy)

      if (rawCopy && sortedAssetObject && !isEqual(rawCopy, sortedAssetObject)) {
        await store.dispatch('pipeline/updatePipeline')
      }
    }

    _currentState.isDataAssetRequestPending = false
  }

  const firstLoad = async () => {
    if (!_currentState.initialized) {
      _currentState.initialized = true

      if (currentDataAssetWrapper.value) {
        await initDataAsset()
      }
      else {
        const cancelWatcher = watch(currentDataAssetWrapper, async () => {
          await initDataAsset()
          cancelWatcher()
        })
      }
    }
  }

  if (autoload) {
    firstLoad()
  }

  const assetTypeChangeHandler = (value: DATA_ASSET_TYPES) => {
    // update asset type
    _currentState.assetType = value
    onAssetFieldValueChange(_currentState.assetTypeError)

    // update partition col:
    // if snapshot then OFFERFIT_RECEIVED_TS_COLUMN
    // else ask the user
    if (_currentState.assetType === DATA_ASSET_TYPES.SNAPSHOT) {
      _currentState.assetPartitionColumn = OFFERFIT_RECEIVED_TS_COLUMN
      onAssetFieldValueChange()
    }
    else {
      _currentState.assetPartitionColumn = undefined
      onAssetFieldValueChange()
    }
    if (primaryKeyColumns.value !== undefined) {
      // if updating after initial creation
      updatePrimaryKeysBasedOnAssetType()
    }
  }

  const updatePrimaryKeysBasedOnAssetType = () => {
    // update pk_cols
    // if snapshot add OFFERFIT_RECEIVED_TS_COLUMN
    // else use only selected pk_cols which do not include
    // OFFERFIT_RECEIVED_TS_COLUMN or OFFERFIT_UPDATED_TS_COLUMN
    let primaryKeyCols: string[] = []
    if (_currentState.assetType === DATA_ASSET_TYPES.SNAPSHOT) {
      if (!primaryKeyColumns.value?.includes(OFFERFIT_RECEIVED_TS_COLUMN)) {
        primaryKeyCols = (primaryKeyColumns.value || []).concat([OFFERFIT_RECEIVED_TS_COLUMN])
      }
    }
    else {
      primaryKeyCols = primaryKeyColumns.value?.filter((item: string) => item !== OFFERFIT_RECEIVED_TS_COLUMN) || []
    }
    const payload: StoreDataChangePayload = {
      fieldObject: currentDataAssetWrapper.value?.asset,
      key: 'pk_cols',
      value: primaryKeyCols,
    }
    store.commit('updateObjectInStore', payload)
    debouncedFieldChange()
  }

  return {
    ...toRefs(_currentState),

    assetSlugName,
    options,
    dataAssetColumns,
    dataAssetColumnsOptions,
    customerIdentifierColNameDoesExistInAsset,
    eventTimeColNameDoesExistInAsset,
    isDataAssetFormBusy,
    targetFileInfo,
    sampleDataFormatsOptions,
    isFileCannotBeFound,
    partitionColumn,
    assetTypeChangeHandler,

    setAssetValues,
    setColumnDependedValues,
    validateFields,

    initDataAsset,

    addNewAsset,
    updateAsset,
    clearForm,
    getSampleDataFileInfoFromBucket,
    loadSourceSampleDataRows,
    rawDataAssetRowsFromBQ,
    getDatasetFileInfoFromBucket,
    getAssetDataFields,
    getSampleDataForColumn,
    updateAssetDataFileInfo,
    addUserIdentifierToMapping,

    onAssetFieldValueChange,
    wantToUseClickHandler,
    assetFilePathFormatChangeHandler,
    dropOverUploaderAreaHandler,
    inputFileChangeHandler,
    assetDataFormatFieldChangeHandler,
    userIdentifierSelectEmitHandler,
    updateFeatureConfigWithRenamedColumnMapping,
    getCustomerIdentifierOriginalColumnName,
    renameCustomerIdInPreprocessing,
    createItemInAssetPreprocessConfig,
    updatePrimaryKeysBasedOnAssetType,
    featuresMappingUserIdentifierColumn,

    getDataAssetFieldTypes,
    loadedDataAsset: toRef(_currentState, 'assetWrapperOriginalCopy'),

    getEventTimeColName,

    pipelineConfig,
    engineerConfig,

    assetKey: currentDataAssetKey,
    state: _currentState,
  }
}

export type AssetDataReactive = ReturnType<typeof useAssetData>

export function getIsolatedAssetDataConfig(assetKey: string, readOnly: boolean = false): AssetDataReactive {
  const assetInstance = useAssetData({
    defaultStateObject: createReactiveDataAssetStateObject(),
    readOnly,
  })
  assetInstance.pipelineConfig.currentDataAssetKey.value = assetKey
  return assetInstance
}

export const globalDataAssetReactiveMap = reactive<Record<string, AssetDataReactive>>({})
