























































































































































































































































































































import * as R from 'ramda';
import { defineComponent, computed, ref, onMounted, onUnmounted, watch, Ref } from '@vue/composition-api';
import { ValidationObserver, ValidationProvider, extend } from 'vee-validate';
import { OrbitSpinner } from 'epic-spinners';
import { useAxios, useFeatureFlags, useQuery, useResult } from '@/app/composable';
import { FormBlock } from '@/app/components';
import { useErrors } from '@/app/composable/errors';
import { HarvesterSourceType, StatusCode } from '@/modules/data-checkin/constants';
import GET_JOB_WITH_STEPS from '../../graphql/getJobWithEnabledSteps.graphql';
import { JobsAPI, ModelAPI } from '../../api';
import { MonitoringAPI } from '@/app/api';
import { useStep } from '../../composable/steps';
import { AssetsAPI } from '../../../asset/api';
import dayjs from 'dayjs';
import { S } from '@/app/utilities';
import { useJobConfiguration } from '../../composable/config-extraction';
import StepCompletionModal from '../../components/StepCompletionModal.vue';
import WizardActions from '../../components/WizardActions.vue';
import { maxLengthValidator, localPathValidator, requiredValidator, minLengthValidator } from '@/app/validators';
import { regex } from 'vee-validate/dist/rules';
import { WorkflowStatus } from '@/modules/apollo/constants';
import { DataCheckinJob, DataCheckinJobStep } from '../../types';

extend('required', requiredValidator);
extend('max', maxLengthValidator);
extend('min', minLengthValidator);
extend('path', localPathValidator);
extend('regex', {
    ...regex,
    message: 'Title must contain only alphanumeric characters, dashes, underscores, spaces and at least one letter.',
});

export default defineComponent({
    name: 'Loader',
    metaInfo() {
        return { title: `Data Loading${(this as any).job ? ` for: ${(this as any).job.name}` : ''}` };
    },
    props: {
        id: {
            type: [Number, String],
            required: true,
        },
        queryParams: {
            type: String,
            default: '{}',
        },
    },
    components: {
        FormBlock,
        OrbitSpinner,
        ValidationProvider,
        ValidationObserver,
        WizardActions,
        StepCompletionModal,
    },
    setup(props, { root }: any) {
        const { isEnabled } = useFeatureFlags();
        const loaderRef: Ref<any> = ref(null);
        const jobId = parseInt(`${props.id}`, 10);
        const showFinalizeModal = ref(false);
        const nextStep: Ref<any> = ref(null);
        const kafkaMessageDeletion: Ref<any> = ref(null);
        const mapping: Ref<any> = ref(null);
        // Fetch job information
        const { checkGQLAuthentication } = useErrors(root.$route);
        const { loading: jobLoading, error: jobError, result, onError } = useQuery(
            GET_JOB_WITH_STEPS,
            { id: jobId },
            { fetchPolicy: 'no-cache' },
        );
        onError(checkGQLAuthentication);
        const job = useResult(result, null, (data: any) => data.job);
        const harvester: Ref<any> = ref(null);

        const isOnPremise = computed(() => {
            if (job.value) {
                return job.value.runnerId !== null;
            }
            return false;
        });

        const hasMapping = computed(() => {
            if (job.value) {
                const idx = job.value.dataCheckinJobSteps.findIndex(
                    (step: DataCheckinJobStep) => step.dataCheckinStepType?.name === 'mapping',
                );
                return idx >= 0;
            }

            return false;
        });

        const hasEncryption = computed(() => {
            if (job.value) {
                const idx = job.value.dataCheckinJobSteps.findIndex(
                    (step: DataCheckinJobStep) => step.dataCheckinStepType?.name === 'encryption',
                );
                return idx >= 0;
            }

            return false;
        });

        const hasCleaning = computed(() => {
            if (job.value) {
                const idx = job.value.dataCheckinJobSteps.findIndex(
                    (step: DataCheckinJobStep) => step.dataCheckinStepType?.name === 'cleaning',
                );
                return idx >= 0;
            }

            return false;
        });

        const hasAnonymisation = computed(() => {
            if (job.value) {
                const idx = job.value.dataCheckinJobSteps.findIndex(
                    (step: DataCheckinJobStep) => step.dataCheckinStepType?.name === 'anonymiser',
                );
                return idx >= 0;
            }
            return false;
        });

        // Fetch loader configuration
        const newAsset = ref({ name: '', description: null, assetTypeId: 1 }); // assetTypeId 1 should be Dataset
        const step: Ref<any> = ref(null);

        const hasChanges = computed(() => {
            if (
                !isFinalized.value &&
                ((!R.isNil(newAsset.value.name) && R.trim(newAsset.value.name) !== '') ||
                    (!R.isNil(newAsset.value.description) && R.trim(newAsset.value.description) !== ''))
            ) {
                return true;
            }
            return false;
        });

        const canFinalize = computed(() => {
            return (
                !R.isNil(newAsset.value.name) &&
                R.trim(newAsset.value.name) !== '' &&
                !R.isNil(newAsset.value.description) &&
                R.trim(newAsset.value.description) !== '' &&
                (!isKafkaAndSuspended.value || !R.isNil(kafkaMessageDeletion.value)) &&
                !R.isNil(step.value) &&
                step.value.status !== StatusCode.Deprecated
            );
        });

        const expiredRetrieveUntilDate = computed(() => {
            const inclusiveDate = dayjs(harvester.value?.configuration?.retrieval?.endDate).add(1, 'day'); // add 1 extra day in order to make it inclusive
            return (
                (isStreamingAndSuspended.value || isExternalStreamingAndSuspended.value) &&
                inclusiveDate.isBefore(dayjs().utc())
            );
        });

        const { isConfigEmpty, isFinalized, getNextStep, canRestart } = useStep(step, job, root);
        const { loading, error, exec } = useAxios(true);

        exec(JobsAPI.getStep(jobId, 'loader')).then(async (res: any) => {
            const configuration = {
                ...res.data.configuration,
                database: null,
                collection: null,
                target: 'create',
                location: 'cloud',
                outputPath: null,
            };
            if (isConfigEmpty(res.data.configuration)) {
                step.value = { ...res.data, configuration };
            } else {
                try {
                    const { data } = (await exec(AssetsAPI.getAsset(res.data.configuration.collection))) as any;
                    newAsset.value = data;
                    step.value = { ...res.data };
                } catch (e: any) {
                    if (e.response?.status === 403) {
                        root.$toastr.e(
                            'You do not have access to the specific step until the dataset becomes available (i.e. once its metadata are filled in).',
                            'Access Denied',
                        );
                        root.$router
                            .push({ name: 'data-checkin-jobs', query: JSON.parse(props.queryParams) })
                            .catch(() => null);
                    }
                }
            }
        });

        const isLoading = computed(() => loading.value || jobLoading.value);
        const assetStoredLocally = computed(() => step.value?.configuration?.location === 'local');
        const cancel = () => root.$router.push({ name: 'data-checkin-jobs', query: JSON.parse(props.queryParams) });

        const { getStructure, getUnmappedStructure, getLoaderSchema } = useJobConfiguration(job);

        const getDroppedFields = async () => {
            if (hasAnonymisation.value) {
                const { data: anonymisation } = (await exec(JobsAPI.getStep(jobId, 'anonymiser'))) as any;
                if (!isConfigEmpty(anonymisation.configuration)) {
                    return anonymisation.configuration.fields.filter(
                        (field: any) =>
                            field.anonymisationType === 'identifier' && field.options.anonymisationMethod === 'drop',
                    );
                }
            }
            return [];
        };

        const createAsset = async () => {
            const dataset: any = {
                ...newAsset.value,
                standard: null,
                metadata: {
                    distribution: {
                        format: null,
                        accessibility: null,
                        accrualMethod: null,
                        accrualPeriodicity: null,
                        storage: assetStoredLocally.value ? 'onPremise' : 'cloud',
                    },
                    runnerId: job.value.runnerId,
                    path: assetStoredLocally.value ? step.value.configuration.outputPath : null,
                    filenames: [],
                    provenance: {
                        id: jobId,
                        type: 'data-checkin',
                        name: job.value.name,
                    },
                },
                structure: null,
                volume: { value: 0, unit: '' },
                processingRules: {},
                accessLevel: job.value.accessLevel,
            };

            if (hasMapping.value) {
                const droppedFields = await getDroppedFields();
                if (!isConfigEmpty(mapping.value.configuration)) {
                    const conceptIds = {
                        conceptIds: mapping.value.configuration.fields
                            .flatMap((field: any) => [...field.target.parentIds, field.target.id])
                            .filter((fieldId: any) => fieldId !== null),
                    };
                    conceptIds.conceptIds.push(mapping.value.configuration.domain.id); // domain id
                    conceptIds.conceptIds.push(mapping.value.configuration.concept.id); // primary concept id
                    const { data: conceptUids } = (await exec(ModelAPI.conceptsUids(conceptIds))) as any;
                    dataset.standard = mapping.value.configuration.standard;
                    dataset.structure = await getStructure(
                        harvester.value.configuration,
                        mapping.value.configuration,
                        conceptUids,
                        droppedFields,
                    );
                    dataset.volume.unit = dataset.structure.type === 'json' ? 'records' : 'files';
                    dataset.processingRules.mappingRules = mapping.value.configuration.fields.filter((obj: any) => {
                        return 'target' in obj && 'id' in obj.target && obj.target.id;
                    });
                    dataset.processingRules.alternateNaming = mapping.value.configuration.alternateNaming ?? null;
                }
            } else {
                dataset.structure = await getUnmappedStructure(harvester.value.configuration);
            }

            if (hasCleaning.value) {
                const { data: cleaning } = (await exec(JobsAPI.getStep(jobId, 'cleaning'))) as any;
                if (!isConfigEmpty(cleaning.configuration)) {
                    dataset.processingRules.cleaningRules = cleaning.configuration.fields;
                }
            }

            if (hasEncryption.value) {
                const { data: encryption } = (await exec(JobsAPI.getStep(jobId, 'encryption'))) as any;
                if (!isConfigEmpty(encryption.configuration)) {
                    dataset.processingRules.encryptionRules = encryption.configuration.fields.map((field: any) => {
                        return {
                            id: field.id,
                            uid: field.uid,
                            index: field.index,
                            encrypt: true,
                        };
                    });
                }
            }

            switch (
                dataset.structure.type // This is not to be removed. Remove ONLY the .distribution.type not the structure.type
            ) {
                case 'json':
                    if (dataset.structure.dataType === 'textBinary') {
                        dataset.metadata.distribution.format = ['JSON and Binary'];
                    } else {
                        dataset.metadata.distribution.format = ['JSON'];
                    }
                    dataset.volume.unit = 'records';
                    break;
                case 'other':
                    dataset.volume.unit = 'files';
                    break;
                default:
                // do nothing
            }
            switch (dataset.structure.source) {
                case HarvesterSourceType.File:
                case HarvesterSourceType.LargeFiles:
                    dataset.metadata.distribution.accrualMethod = 'By uploading updated/revised file(s)';
                    if (dataset.structure.type === 'other' || !dataset.structure.domain) {
                        dataset.metadata.distribution.accessibility = assetStoredLocally.value
                            ? ['Limited to locally executed pipelines only']
                            : ['As a downloadable file'];
                    } else {
                        dataset.metadata.distribution.accessibility = assetStoredLocally.value
                            ? ['Limited to locally executed pipelines only']
                            : ['Through an API'];
                    }
                    break;
                case HarvesterSourceType.Api:
                case HarvesterSourceType.InternalApi:
                    dataset.metadata.distribution.accrualMethod = 'Through an API';
                    dataset.metadata.distribution.accessibility = assetStoredLocally.value
                        ? ['Limited to locally executed pipelines only']
                        : ['Through an API'];
                    break;
                case HarvesterSourceType.Kafka:
                case HarvesterSourceType.ExternalKafka:
                case HarvesterSourceType.MQTT:
                case HarvesterSourceType.ExternalMQTT:
                    dataset.metadata.distribution.accrualMethod = 'Through a data streaming mechanism';
                    dataset.metadata.distribution.accessibility = ['Through an API'];
                    if (isEnabled('retrieve.stream')) {
                        dataset.metadata.distribution.accessibility.push('Through a data streaming mechanism');
                    }
                    break;
                default:
                // do nothing
            }

            dataset.createdById = job.value.createdBy.id;

            return exec(AssetsAPI.createAsset(dataset));
        };

        const save = async () => {
            const valid = await loaderRef.value.validate();
            let assetId: number | undefined;
            if (valid && step.value.status !== StatusCode.Deprecated) {
                if (job.value.workflow.status === WorkflowStatus.Suspended) {
                    if (expiredRetrieveUntilDate.value) {
                        root.$toastr.e(
                            'Retrieve Until Date is in the past. Please update it accordingly to continue.',
                            'Invalid Retrieve Until Date',
                        );
                        return;
                    }

                    const promises = [
                        exec(MonitoringAPI.resetExecutions(job.value.workflowId)),
                        exec(JobsAPI.reset(job.value.id, kafkaMessageDeletion.value)),
                    ];

                    try {
                        await Promise.all(promises);
                    } catch (e: any) {
                        if (e.response?.status === 500)
                            root.$toastr.e('Service is unavailable at the moment. Please try again later.', 'Error');
                        else root.$toastr.e('Error deleting metadata', 'Error');
                        return;
                    }
                }

                // Add correct database type
                if (hasMapping.value) step.value.configuration.database = process.env.VUE_APP_LOADER_DATABASE;

                if (step.value.configuration.target === 'create') {
                    try {
                        const { data: asset } = (await createAsset()) as any;
                        assetId = asset.id;
                        step.value.configuration.collection = asset.id;
                        step.value.configuration.schema = getLoaderSchema(
                            asset.structure,
                            mapping.value?.configuration,
                        );
                        step.value.configuration.hasEncryption = hasEncryption.value;
                    } catch (e) {
                        root.$toastr.e('Error creating dataset', 'Error');
                        return;
                    }

                    // Save configuration
                    await exec(
                        JobsAPI.updateStep(step.value.id, {
                            configuration: step.value.configuration,
                            serviceVersion: process.env.VUE_APP_LOADER_VERSION,
                        }),
                    );
                    await exec(
                        JobsAPI.update(jobId, { id: job.value.id, name: job.value.name, assetId } as DataCheckinJob),
                    );
                } else if (step.value.configuration.target === 'update') {
                    // TODO: Handle update
                }
                exec(JobsAPI.finalize(step.value.id))
                    .then(() => {
                        getNextStep().then((stepTypeResponse: any) => {
                            showFinalizeModal.value = true;
                            nextStep.value = stepTypeResponse;
                        });
                    })
                    .catch(() => {
                        root.$toastr.e('Error finalising step', 'Error');
                    });
            }
        };

        const restartStep = async () => {
            await exec(JobsAPI.restartStep(step.value.id)).then(() => {
                exec(JobsAPI.finalize(step.value.id))
                    .then(() => {
                        root.$toastr.s(
                            `Data Check-in Pipeline "${S.sanitizeHtml(job.value.name)}" is now restarting.`,
                            'Success',
                        );
                        root.$router.push({ name: 'data-checkin-jobs', query: JSON.parse(props.queryParams) });
                    })
                    .catch(() => {
                        root.$toastr.e('Restarting of the Data Check-in Pipeline failed', 'Failed');
                    });
            });
        };

        const unlockJob = async () => {
            await exec(JobsAPI.unlock(Number(props.id)));
        };

        const fetchSteps = async () => {
            const { data: harvesterData } = (await exec(JobsAPI.getStep(jobId, 'harvester'))) as any;
            harvester.value = harvesterData;
            if (hasMapping.value) {
                const { data: mappingData } = (await exec(JobsAPI.getStep(jobId, 'mapping'))) as any;
                mapping.value = mappingData;
            }
        };

        const isSuspended = computed(() => job.value?.workflow?.status === WorkflowStatus.Suspended);

        const isKafkaAndSuspended = computed(
            () => harvester.value?.configuration?.source === HarvesterSourceType.Kafka && isSuspended.value,
        );

        const isStreamingAndSuspended = computed(
            () =>
                [HarvesterSourceType.Kafka, HarvesterSourceType.MQTT].includes(
                    harvester.value?.configuration?.source,
                ) && isSuspended.value,
        );

        const isExternalStreamingAndSuspended = computed(
            () =>
                [HarvesterSourceType.ExternalKafka, HarvesterSourceType.ExternalMQTT].includes(
                    harvester.value?.configuration?.source,
                ) && isSuspended.value,
        );

        onMounted(async () => {
            window.addEventListener('beforeunload', unlockJob);
        });

        onUnmounted(async () => {
            unlockJob();
        });

        watch(
            () => job.value,
            (jobData) => {
                if (jobData) fetchSteps();
            },
        );

        return {
            cancel,
            error,
            hasMapping,
            isFinalized,
            isLoading,
            job,
            jobError,
            jobLoading,
            loaderRef,
            loading,
            newAsset,
            save,
            step,
            hasChanges,
            canFinalize,
            nextStep,
            showFinalizeModal,
            StatusCode,
            isOnPremise,
            canRestart,
            restartStep,
            assetStoredLocally,
            hasEncryption,
            kafkaMessageDeletion,
            harvester,
            expiredRetrieveUntilDate,
            isKafkaAndSuspended,
        };
    },
});
