일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 |
Tags
- crossfit_geeks #크로스핏 #crossfit #당산크로스핏 #크로스핏긱스 #running #역도 #오운완 #크로스핏터 #Django가 고장날때까지
- Push
- trino #hive #sync
- Git
- 타입 #type
- fetch
- Exception
- 티스토리챌린지
- aws #modernization #eks #k8s
- 카프카
- Glossary #Python
- til #loguru #str #format
- nodeport
- pyenv
- pull
- k8s
- Python #PEP
- 오블완
- merge
- Trino
- 쿠버네티스
- Python3 #PEP
Archives
- Today
- Total
Django가 고장날때까지
[Airflow] - Airflow에서 DAG를 식별하는 과정 본문
반응형
DAG File Processing — Airflow Documentation
airflow.apache.org
위치: airflow/dag_processing/manager.py
airflow.dag_processing.manager.DagFileProcessorManager
@attrs.define
class DagFileProcessorManager(LoggingMixin):
"""
Manage processes responsible for parsing DAGs.
Given a list of DAG definition files, this kicks off several processors
in parallel to process them and put the results to a multiprocessing.Queue
for DagFileProcessorAgent to harvest. The parallelism is limited and as the
processors finish, more are launched. The files are processed over and
over again, but no more often than the specified interval.
:param max_runs: The number of times to parse each file. -1 for unlimited.
:param processor_timeout: How long to wait before timing out a DAG file processor
"""
위치: airflow/dag_processing/processor.py
airflow.dag_processing.processor.DagFileProcessorProcess
@attrs.define(kw_only=True)
class DagFileProcessorProcess(WatchedSubprocess):
"""
Parses dags with Task SDK API.
This class provides a wrapper and management around a subprocess to parse a specific DAG file.
Since DAGs are written with the Task SDK, we need to parse them in a task SDK process such that
we can use the Task SDK definitions when serializing. This prevents potential conflicts with classes
in core Airflow.
"""
@classmethod
def start( # type: ignore[override]
cls,
*,
path: str | os.PathLike[str],
bundle_path: Path,
callbacks: list[CallbackRequest],
target: Callable[[], None] = _parse_file_entrypoint,
**kwargs,
) -> Self:
proc: Self = super().start(target=target, **kwargs)
proc._on_child_started(callbacks, path, bundle_path)
return proc
super().start(target=target, **kwargs)
@classmethod
def start(
cls,
*,
target: Callable[[], None] = _subprocess_main,
logger: FilteringBoundLogger | None = None,
**constructor_kwargs,
) -> Self:
"""Fork and start a new subprocess with the specified target function."""
# Create socketpairs/"pipes" to connect to the stdin and out from the subprocess
child_stdin, feed_stdin = mkpipe(remote_read=True)
child_stdout, read_stdout = mkpipe()
child_stderr, read_stderr = mkpipe()
# Open these socketpair before forking off the child, so that it is open when we fork.
child_comms, read_msgs = mkpipe()
child_logs, read_logs = mkpipe()
pid = os.fork()
if pid == 0:
# Parent ends of the sockets are closed by the OS as they are set as non-inheritable
# Python GC should delete these for us, but lets make double sure that we don't keep anything
# around in the forked processes, especially things that might involve open files or sockets!
del constructor_kwargs
del logger
try:
# Run the child entrypoint
_fork_main(child_stdin, child_stdout, child_stderr, child_logs.fileno(), target)
except BaseException as e:
try:
# We can't use log here, as if we except out of _fork_main something _weird_ went on.
print("Exception in _fork_main, exiting with code 124", e, file=sys.stderr)
except BaseException as e:
pass
# It's really super super important we never exit this block. We are in the forked child, and if we
# do then _THINGS GET WEIRD_.. (Normally `_fork_main` itself will `_exit()` so we never get here)
os._exit(124)
requests_fd = child_comms.fileno()
# Close the remaining parent-end of the sockets we've passed to the child via fork. We still have the
# other end of the pair open
cls._close_unused_sockets(child_stdin, child_stdout, child_stderr, child_comms, child_logs)
proc = cls(
pid=pid,
stdin=feed_stdin,
process=psutil.Process(pid),
requests_fd=requests_fd,
**constructor_kwargs,
)
logger = logger or cast("FilteringBoundLogger", structlog.get_logger(logger_name="task").bind())
proc._register_pipe_readers(
logger=logger,
stdout=read_stdout,
stderr=read_stderr,
requests=read_msgs,
logs=read_logs,
)
return proc
target: Callable[[], None] = _parse_file_entrypoint
def _parse_file_entrypoint():
import os
import structlog
from airflow.sdk.execution_time import task_runner
from airflow.settings import configure_orm
# Parse DAG file, send JSON back up!
# We need to reconfigure the orm here, as DagFileProcessorManager does db queries for bundles, and
# the session across forks blows things up.
configure_orm()
comms_decoder = task_runner.CommsDecoder[DagFileParseRequest, DagFileParsingResult](
input=sys.stdin,
decoder=TypeAdapter[DagFileParseRequest](DagFileParseRequest),
)
msg = comms_decoder.get_message()
comms_decoder.request_socket = os.fdopen(msg.requests_fd, "wb", buffering=0)
log = structlog.get_logger(logger_name="task")
result = _parse_file(msg, log)
if result is not None:
comms_decoder.send_request(log, result)
decoder=TypeAdapter[DagFileParseRequest](DagFileParseRequest)
class DagFileParseRequest(BaseModel):
"""
Request for DAG File Parsing.
This is the request that the manager will send to the DAG parser with the dag file and
any other necessary metadata.
"""
file: str
bundle_path: Path
"""Passing bundle path around lets us figure out relative file path."""
requests_fd: int
callback_requests: list[CallbackRequest] = Field(default_factory=list)
type: Literal["DagFileParseRequest"] = "DagFileParseRequest"
result = _parse_file(msg, log)
def _parse_file(msg: DagFileParseRequest, log: FilteringBoundLogger) -> DagFileParsingResult | None:
# TODO: Set known_pool names on DagBag!
bag = DagBag(
dag_folder=msg.file,
bundle_path=msg.bundle_path,
include_examples=False,
safe_mode=True,
load_op_links=False,
)
if msg.callback_requests:
# If the request is for callback, we shouldn't serialize the DAGs
_execute_callbacks(bag, msg.callback_requests, log)
return None
serialized_dags, serialization_import_errors = _serialize_dags(bag, log)
bag.import_errors.update(serialization_import_errors)
dags = [LazyDeserializedDAG(data=serdag) for serdag in serialized_dags]
result = DagFileParsingResult(
fileloc=msg.file,
serialized_dags=dags,
import_errors=bag.import_errors,
# TODO: Make `bag.dag_warnings` not return SQLA model objects
warnings=[],
)
return result
proc._on_child_started(callbacks, path, bundle_path)
def _on_child_started(
self,
callbacks: list[CallbackRequest],
path: str | os.PathLike[str],
bundle_path: Path,
) -> None:
msg = DagFileParseRequest(
file=os.fspath(path),
bundle_path=bundle_path,
requests_fd=self._requests_fd,
callback_requests=callbacks,
)
self.stdin.write(msg.model_dump_json().encode() + b"\n")
반응형
Comments