Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,11 @@
"title": "Mark {{type}} as {{state}}"
},
"options": {
"clearAllMappedTasks": "Clear all mapped task instances",
"downstream": "Downstream",
"existingTasks": "Clear existing tasks",
"future": "Future",
"markAllMappedTasks": "Mark all mapped task instances",
"onlyFailed": "Clear only failed tasks",
"past": "Past",
"preventRunningTasks": "Prevent rerun if task is running",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import { getRelativeTime } from "src/utils/datetimeUtils";

type Props = {
readonly dagDetails?: {
clearAllMapped?: boolean;
dagId: string;
dagRunId: string;
downstream?: boolean;
Expand Down Expand Up @@ -67,7 +68,10 @@ const ClearTaskInstanceConfirmationDialog = ({
include_past: dagDetails?.past,
include_upstream: dagDetails?.upstream,
only_failed: dagDetails?.onlyFailed,
task_ids: [[dagDetails?.taskId ?? "", dagDetails?.mapIndex ?? 0]],
task_ids:
dagDetails?.clearAllMapped && dagDetails.mapIndex !== undefined && dagDetails.mapIndex > -1
? [dagDetails.taskId]
: [[dagDetails?.taskId ?? "", dagDetails?.mapIndex ?? 0]],
},
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ const ClearTaskInstanceDialog = ({ onClose: onCloseDialog, open: openDialog, tas
const downstream = selectedOptions.includes("downstream");
const [runOnLatestVersion, setRunOnLatestVersion] = useState(false);
const [preventRunningTask, setPreventRunningTask] = useState(true);
const [clearAllMapped, setClearAllMapped] = useState(false);
const isMapped = mapIndex > -1;

const [note, setNote] = useState<string | null>(taskInstance.note);
const { isPending: isPendingPatchDagRun, mutate: mutatePatchTaskInstance } = usePatchTaskInstance({
Expand Down Expand Up @@ -98,7 +100,7 @@ const ClearTaskInstanceDialog = ({ onClose: onCloseDialog, open: openDialog, tas
include_upstream: upstream,
only_failed: onlyFailed,
run_on_latest_version: runOnLatestVersion,
task_ids: [[taskId, mapIndex]],
task_ids: isMapped && clearAllMapped ? [taskId] : [[taskId, mapIndex]],
},
});

Expand Down Expand Up @@ -171,6 +173,7 @@ const ClearTaskInstanceDialog = ({ onClose: onCloseDialog, open: openDialog, tas
<ActionAccordion affectedTasks={affectedTasks} note={note} setNote={setNote} />
<Flex
{...(shouldShowBundleVersionOption ? { alignItems: "center" } : {})}
gap={4}
justifyContent={shouldShowBundleVersionOption ? "space-between" : "end"}
mt={3}
>
Expand All @@ -182,6 +185,14 @@ const ClearTaskInstanceDialog = ({ onClose: onCloseDialog, open: openDialog, tas
{translate("dags:runAndTaskActions.options.runOnLatestVersion")}
</Checkbox>
) : undefined}
{isMapped ? (
<Checkbox
Comment thread
haseebmalik18 marked this conversation as resolved.
checked={clearAllMapped}
onCheckedChange={(event) => setClearAllMapped(Boolean(event.checked))}
>
{translate("dags:runAndTaskActions.options.clearAllMappedTasks")}
</Checkbox>
) : undefined}
<Checkbox
checked={preventRunningTask}
onCheckedChange={(event) => setPreventRunningTask(Boolean(event.checked))}
Expand All @@ -204,6 +215,7 @@ const ClearTaskInstanceDialog = ({ onClose: onCloseDialog, open: openDialog, tas
{open ? (
<ClearTaskInstanceConfirmationDialog
dagDetails={{
clearAllMapped: isMapped && clearAllMapped,
dagId,
dagRunId,
downstream,
Expand All @@ -227,7 +239,7 @@ const ClearTaskInstanceDialog = ({ onClose: onCloseDialog, open: openDialog, tas
include_upstream: upstream,
only_failed: onlyFailed,
run_on_latest_version: runOnLatestVersion,
task_ids: [[taskId, mapIndex]],
task_ids: isMapped && clearAllMapped ? [taskId] : [[taskId, mapIndex]],
...(preventRunningTask ? { prevent_running_task: true } : {}),
},
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import type { TaskInstanceResponse, TaskInstanceState } from "openapi/requests/t
import { ActionAccordion } from "src/components/ActionAccordion";
import { StateBadge } from "src/components/StateBadge";
import Time from "src/components/Time";
import { Dialog } from "src/components/ui";
import { Checkbox, Dialog } from "src/components/ui";
import SegmentedControl from "src/components/ui/SegmentedControl";
import { usePatchTaskInstance } from "src/queries/usePatchTaskInstance";
import { usePatchTaskInstanceDryRun } from "src/queries/usePatchTaskInstanceDryRun";
Expand All @@ -44,6 +44,8 @@ const MarkTaskInstanceAsDialog = ({ onClose, open, state, taskInstance }: Props)
const { t: translate } = useTranslation();

const [selectedOptions, setSelectedOptions] = useState<Array<string>>([]);
const [markAllMapped, setMarkAllMapped] = useState(false);
const isMapped = mapIndex > -1;

const past = selectedOptions.includes("past");
const future = selectedOptions.includes("future");
Expand All @@ -52,17 +54,19 @@ const MarkTaskInstanceAsDialog = ({ onClose, open, state, taskInstance }: Props)

const [note, setNote] = useState<string | null>(taskInstance.note);

const effectiveMapIndex = isMapped && markAllMapped ? undefined : mapIndex;

const { isPending, mutate } = usePatchTaskInstance({
dagId,
dagRunId,
mapIndex,
mapIndex: effectiveMapIndex,
onSuccess: onClose,
taskId,
});
const { data, isPending: isPendingDryRun } = usePatchTaskInstanceDryRun({
dagId,
dagRunId,
mapIndex,
mapIndex: effectiveMapIndex,
options: {
enabled: open,
refetchOnMount: "always",
Expand Down Expand Up @@ -134,14 +138,23 @@ const MarkTaskInstanceAsDialog = ({ onClose, open, state, taskInstance }: Props)
</Flex>
<ActionAccordion affectedTasks={affectedTasks} note={note} setNote={setNote} />
<Flex justifyContent="end" mt={3}>
{isMapped ? (
<Checkbox
checked={markAllMapped}
mr="auto"
onCheckedChange={(event) => setMarkAllMapped(Boolean(event.checked))}
>
{translate("dags:runAndTaskActions.options.markAllMappedTasks")}
</Checkbox>
) : undefined}
<Button
colorPalette="brand"
loading={isPending || isPendingDryRun}
onClick={() => {
mutate({
dagId,
dagRunId,
mapIndex,
mapIndex: effectiveMapIndex,
requestBody: {
include_downstream: downstream,
include_future: future,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ export const usePatchTaskInstance = ({
}: {
dagId: string;
dagRunId: string;
mapIndex: number;
mapIndex?: number;
onSuccess?: () => void;
taskId: string;
}) => {
Expand All @@ -60,7 +60,9 @@ export const usePatchTaskInstance = ({
const onSuccessFn = async () => {
const queryKeys = [
UseTaskInstanceServiceGetTaskInstanceKeyFn({ dagId, dagRunId, taskId }),
UseTaskInstanceServiceGetMappedTaskInstanceKeyFn({ dagId, dagRunId, mapIndex, taskId }),
...(mapIndex === undefined
? []
: [UseTaskInstanceServiceGetMappedTaskInstanceKeyFn({ dagId, dagRunId, mapIndex, taskId })]),
[useTaskInstanceServiceGetTaskInstancesKey],
[usePatchTaskInstanceDryRunKey, dagId, dagRunId, { mapIndex, taskId }],
[useClearTaskInstancesDryRunKey, dagId],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import type { PatchTaskInstanceBody, PatchTaskInstanceDryRunResponse } from "ope
type Props<TData, TError> = {
dagId: string;
dagRunId: string;
mapIndex: number;
mapIndex?: number;
options?: Omit<UseQueryOptions<TData, TError>, "queryFn" | "queryKey">;
requestBody: PatchTaskInstanceBody;
taskId: string;
Expand Down
Loading