Source code for dags.output
import functools
[docs]def single_output(func):
"""Convert tuple output to single output."""
@functools.wraps(func)
def wrapper_single_output(*args, **kwargs):
raw = func(*args, **kwargs)
out = raw[0]
return out
return wrapper_single_output
[docs]def dict_output(func=None, *, keys=None):
"""Convert tuple output to dict output."""
def decorator_dict_output(func):
@functools.wraps(func)
def wrapper_dict_output(*args, **kwargs):
raw = func(*args, **kwargs)
out = dict(zip(keys, raw))
return out
return wrapper_dict_output
if callable(func):
return decorator_dict_output(func)
else:
return decorator_dict_output
[docs]def list_output(func):
"""Convert tuple output to list output."""
@functools.wraps(func)
def wrapper_list_output(*args, **kwargs):
raw = func(*args, **kwargs)
out = list(raw)
return out
return wrapper_list_output
[docs]def aggregated_output(func=None, *, aggregator=None):
"""Aggregate tuple output."""
def decorator_aggregated_output(func):
@functools.wraps(func)
def wrapper_aggregated_output(*args, **kwargs):
raw = func(*args, **kwargs)
agg = raw[0]
for entry in raw[1:]:
agg = aggregator(agg, entry)
return agg
return wrapper_aggregated_output
if callable(func):
return decorator_aggregated_output(func)
else:
return decorator_aggregated_output