1 回答

TA貢獻(xiàn)1794條經(jīng)驗(yàn) 獲得超8個贊
解決方案是創(chuàng)建第三個任務(wù)(在示例中Dynamic),它產(chǎn)生等待動態(tài)輸入的任務(wù),并使依賴項(xiàng)成為參數(shù)而不是requires方法。
class ToDatabase(luigi.Task):
fp = luigi.Parameter()
def output(self):
with open(self.fp, 'r') as f:
valid_ids = [str(e) for e in f.read().split(',')]
client = pymongo.MongoClient('localhost', 27017, ssl=False)
return mongodb.MongoRangeTarget(client, 'myDB', 'myData',
valid_ids, 'myField')
def run(self):
with open(self.fp, 'r') as f:
valid_ids = [str(e) for e in f.read().split(',')]
self.output().write({k: 5 for k in valid_ids})
class Dynamic(luigi.Task):
def output(self):
return self.input()
def requires(self):
return MakeIDs()
def run(self):
yield(AddToDatabase(fp=self.input().path))
添加回答
舉報