Django가 고장날때까지

[Airflow] - Airflow에서 DAG를 식별하는 과정 본문

카테고리 없음

[Airflow] - Airflow에서 DAG를 식별하는 과정

Django가 고장날때까지 2025. 2. 7. 15:18
반응형

 

 

 

https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/dagfile-processing.html#dag-file-processing

 

DAG File Processing — Airflow Documentation

 

airflow.apache.org

 

 

 

 

위치: airflow/dag_processing/manager.py

airflow.dag_processing.manager.DagFileProcessorManager

@attrs.define
class DagFileProcessorManager(LoggingMixin):
    """
    Manage processes responsible for parsing DAGs.

    Given a list of DAG definition files, this kicks off several processors
    in parallel to process them and put the results to a multiprocessing.Queue
    for DagFileProcessorAgent to harvest. The parallelism is limited and as the
    processors finish, more are launched. The files are processed over and
    over again, but no more often than the specified interval.

    :param max_runs: The number of times to parse each file. -1 for unlimited.
    :param processor_timeout: How long to wait before timing out a DAG file processor
    """

 

 

위치: airflow/dag_processing/processor.py

airflow.dag_processing.processor.DagFileProcessorProcess

@attrs.define(kw_only=True)
class DagFileProcessorProcess(WatchedSubprocess):
    """
    Parses dags with Task SDK API.

    This class provides a wrapper and management around a subprocess to parse a specific DAG file.

    Since DAGs are written with the Task SDK, we need to parse them in a task SDK process such that
    we can use the Task SDK definitions when serializing. This prevents potential conflicts with classes
    in core Airflow.
    """

 

 

@classmethod
def start(  # type: ignore[override]
    cls,
    *,
    path: str | os.PathLike[str],
    bundle_path: Path,
    callbacks: list[CallbackRequest],
    target: Callable[[], None] = _parse_file_entrypoint,
    **kwargs,
) -> Self:
    proc: Self = super().start(target=target, **kwargs)
    proc._on_child_started(callbacks, path, bundle_path)
    return proc

 

super().start(target=target, **kwargs)

@classmethod
def start(
    cls,
    *,
    target: Callable[[], None] = _subprocess_main,
    logger: FilteringBoundLogger | None = None,
    **constructor_kwargs,
) -> Self:
    """Fork and start a new subprocess with the specified target function."""
    # Create socketpairs/"pipes" to connect to the stdin and out from the subprocess
    child_stdin, feed_stdin = mkpipe(remote_read=True)
    child_stdout, read_stdout = mkpipe()
    child_stderr, read_stderr = mkpipe()

    # Open these socketpair before forking off the child, so that it is open when we fork.
    child_comms, read_msgs = mkpipe()
    child_logs, read_logs = mkpipe()

    pid = os.fork()
    if pid == 0:
        # Parent ends of the sockets are closed by the OS as they are set as non-inheritable

        # Python GC should delete these for us, but lets make double sure that we don't keep anything
        # around in the forked processes, especially things that might involve open files or sockets!
        del constructor_kwargs
        del logger

        try:
            # Run the child entrypoint
            _fork_main(child_stdin, child_stdout, child_stderr, child_logs.fileno(), target)
        except BaseException as e:
            try:
                # We can't use log here, as if we except out of _fork_main something _weird_ went on.
                print("Exception in _fork_main, exiting with code 124", e, file=sys.stderr)
            except BaseException as e:
                pass

        # It's really super super important we never exit this block. We are in the forked child, and if we
        # do then _THINGS GET WEIRD_.. (Normally `_fork_main` itself will `_exit()` so we never get here)
        os._exit(124)

    requests_fd = child_comms.fileno()

    # Close the remaining parent-end of the sockets we've passed to the child via fork. We still have the
    # other end of the pair open
    cls._close_unused_sockets(child_stdin, child_stdout, child_stderr, child_comms, child_logs)

    proc = cls(
        pid=pid,
        stdin=feed_stdin,
        process=psutil.Process(pid),
        requests_fd=requests_fd,
        **constructor_kwargs,
    )

    logger = logger or cast("FilteringBoundLogger", structlog.get_logger(logger_name="task").bind())
    proc._register_pipe_readers(
        logger=logger,
        stdout=read_stdout,
        stderr=read_stderr,
        requests=read_msgs,
        logs=read_logs,
    )

    return proc

 

target: Callable[[], None] = _parse_file_entrypoint

def _parse_file_entrypoint():
    import os

    import structlog

    from airflow.sdk.execution_time import task_runner
    from airflow.settings import configure_orm
    # Parse DAG file, send JSON back up!

    # We need to reconfigure the orm here, as DagFileProcessorManager does db queries for bundles, and
    # the session across forks blows things up.
    configure_orm()

    comms_decoder = task_runner.CommsDecoder[DagFileParseRequest, DagFileParsingResult](
        input=sys.stdin,
        decoder=TypeAdapter[DagFileParseRequest](DagFileParseRequest),
    )
    msg = comms_decoder.get_message()
    comms_decoder.request_socket = os.fdopen(msg.requests_fd, "wb", buffering=0)

    log = structlog.get_logger(logger_name="task")

    result = _parse_file(msg, log)
    if result is not None:
        comms_decoder.send_request(log, result)

 

decoder=TypeAdapter[DagFileParseRequest](DagFileParseRequest)

class DagFileParseRequest(BaseModel):
    """
    Request for DAG File Parsing.

    This is the request that the manager will send to the DAG parser with the dag file and
    any other necessary metadata.
    """

    file: str

    bundle_path: Path
    """Passing bundle path around lets us figure out relative file path."""

    requests_fd: int
    callback_requests: list[CallbackRequest] = Field(default_factory=list)
    type: Literal["DagFileParseRequest"] = "DagFileParseRequest"

 

result = _parse_file(msg, log)

def _parse_file(msg: DagFileParseRequest, log: FilteringBoundLogger) -> DagFileParsingResult | None:
    # TODO: Set known_pool names on DagBag!
    bag = DagBag(
        dag_folder=msg.file,
        bundle_path=msg.bundle_path,
        include_examples=False,
        safe_mode=True,
        load_op_links=False,
    )
    if msg.callback_requests:
        # If the request is for callback, we shouldn't serialize the DAGs
        _execute_callbacks(bag, msg.callback_requests, log)
        return None

    serialized_dags, serialization_import_errors = _serialize_dags(bag, log)
    bag.import_errors.update(serialization_import_errors)
    dags = [LazyDeserializedDAG(data=serdag) for serdag in serialized_dags]
    result = DagFileParsingResult(
        fileloc=msg.file,
        serialized_dags=dags,
        import_errors=bag.import_errors,
        # TODO: Make `bag.dag_warnings` not return SQLA model objects
        warnings=[],
    )
    return result

 

proc._on_child_started(callbacks, path, bundle_path)

def _on_child_started(
    self,
    callbacks: list[CallbackRequest],
    path: str | os.PathLike[str],
    bundle_path: Path,
) -> None:
    msg = DagFileParseRequest(
        file=os.fspath(path),
        bundle_path=bundle_path,
        requests_fd=self._requests_fd,
        callback_requests=callbacks,
    )
    self.stdin.write(msg.model_dump_json().encode() + b"\n")

 

 

 

 

반응형
Comments