码头工人 – 由于Luigi工作分布不均而导致工人死亡(2.6.1)

我们正在尝试运行分布在docker swarm集群上的简单管道. luigi worker被部署为复制的docker服务.他们成功启动,在向luigi-server请求工作几秒钟之后,由于没有为他们分配工作而他们开始死亡,所有任务最终都分配给一个工作人员.

我们不得不在我们工人的luigi.cfg中设置keep_alive = True来强迫他们不要死,但是在管道完成后让工人保持身边似乎是一个坏主意.
有没有办法控制工作分配?

我们的测试管道:

class RunAllTasks(luigi.Task):

    tasks = luigi.IntParameter()
    sleep_time = luigi.IntParameter()

    def requires(self):
        for i in range(self.tasks):
            yield RunExampleTask(i,self.sleep_time)

    def run(self):
        with self.output().open('w') as f:
            f.write('All done!')

    def output(self):
        return LocalTarget('/data/RunAllTasks.txt')


class RunExampleTask(luigi.Task):

    number = luigi.IntParameter()
    sleep_time = luigi.IntParameter()

    @property
    def cmd(self):
        return """
               docker run --rm --name example_{number} hello-world
           """.format(number=self.number)

    def run(self):
        time.sleep(self.sleep_time)
        logger.debug(self.cmd)
        out = subprocess.check_output(self.cmd,stderr=subprocess.STDOUT,shell=True)
        logger.debug(out)
        with self.output().open('w') as f:
            f.write(str(out))

    def output(self):
        return LocalTarget('/data/{number}.txt'.format(number=self.number))


if __name__ == "__main__":
    luigi.run()
最佳答案
您的问题是一次产生一个要求的结果,而您希望立即产生所有这些要求,如下所示:

def requires(self):
    reqs = []
    for i in range(self.tasks):
        reqs.append(RunExampleTask(i,self.sleep_time))
    yield reqs

相关文章

Docker 可以让开发者打包他们的应用以及依赖包到一个轻量级、可移植的容器中,然后发布到任何流行的 Li...
1、什么是docker?答:docker是开源的应用容器引擎;开发人员把他们的应用及依赖包打包发布到容器当中。...
1、什么是namespace? 答:名称空间,作用隔离容器 2、namespace隔离有那些? 答:ipc:共享内存、消息队...
1、Docker能在非Linux平台(Windows+MacOS)上运行吗? 答:可以 2 、如何将一台宿主机的docker环境...
环境要求: IP hostname 192.168.1.1 node1 项目规划: 容器网段:172.16.10.0/24 NGINX:172.16.10.10...
文档上传地址:https://files.cnblogs.com/files/lin-strive/07-docker%E8%B7%A8%E4%B8%BB%E6%9C%BA%E7...