Luigi - 在任务之间传递 var 作为输出

Luigi - Pass a var as an output between tasks

提问人:Luiscri 提问时间:5/10/2019 最后编辑:Luiscri 更新时间:5/10/2019 访问量:633

问:

我知道我可以通过将变量保存到文件中然后在下一个任务中读取它来在任务之间传递变量。但是,我不想在我的项目上生成这么多文档,所以我尝试将其直接作为变量传递。

我的代码如下所示:

class FilterSpam(luigi.Task):
    time_slice = luigi.parameter.DateMinuteParameter(interval=30, default=datetime.datetime.today())

    filtered_tweets = []

    def requires(self):
        return Streaming(time_slice=self.time_slice)

    def run(self):
        with self.input().open('r') as infile:
            reader = csv.DictReader(infile, delimiter='\t')
            tweets = list(reader)
            self.filtered_tweets, spam = filter_spam(tweets, 0.7)

        with open('data/results/detected_spam.txt', 'a') as spam_file:
            for tweet in spam:    
                spam_json = json.dumps(tweet, ensure_ascii=False)
                spam_file.write(spam_json+'\n')

    def output(self):
        return self.filtered_tweets

class LemmatizeTweets(luigi.Task):
    time_slice = luigi.parameter.DateMinuteParameter(interval=30, default=datetime.datetime.today())

    def requires(self):
        return FilterSpam(time_slice=self.time_slice)

    def run():
        filtered_tweets = self.input() # Lista de diccionarios

        lemmatized_tweets = lemmatize(filtered_tweets)

        with self.output().open('w') as outfile:
            for tweet in lemmatized_tweets:
                tweet_json = json.dumps(tweet, ensure_ascii=False)
                outfile.write(tweet_json+'\n')

    def output(self):
        return luigi.LocalTarget('data/processed/{}.csv'.format(self.time_slice))

其中是字典列表。filtered_tweets

是否可以将此 var 传递给任务而不必将其保存到文件中?如果没有,保存值的最佳方法是什么?在泡菜中,每行中都有一个 json 对象的 .txt...?LemmatizeTweets

python var luigi

评论

0赞 sgerbhctim 10/26/2019
你有没有找到解决这个问题的方法?
0赞 Luiscri 10/27/2019
不完全是。有人建议我在同一任务中完成所有更改。例如,在一个名为“ProcessTweets”的任务中加入了我的“FilterSpam”任务和“LemmatizeTweets”,该任务执行了这两个操作,然后将结果保存到文件中。我不知道这对你是否有帮助,你到底想做什么?

答: 暂无答案