[Python-modules-team] Bug#955772: dask.distributed: flaky autopkgtest: timeout reached in test_robust_to_bad_sizeof_estimates

Paul Gevers elbrus at debian.org
Sat Apr 4 20:35:58 BST 2020


Source: dask.distributed
Version: 2.10.0+ds.1-3
Severity: serious
Tags: sid bullseye
X-Debbugs-CC: debian-ci at lists.debian.org
User: debian-ci at lists.debian.org
Usertags: flaky

Dear maintainer(s),

You package has an autopkgtest, great. However, until recently it always
failed. With the upload of 2.10.0+ds.1-3 it now *sometimes* passes.

Because the unstable-to-testing migration software now blocks on
regressions in testing, flaky tests, i.e. tests that flip between
passing and failing without changes to the list of installed packages,
are causing people unrelated to your package to spend time on these
tests. Please either fix the test to be more robust, or or use the
"flaky" restriction for the offending test until a solution has been found.

I copied the output at the bottom of this report. All the failing tests
that I inspected look like it.

I'll have the migration software ignore the results of your autopkgtest
until this bug is fixed.

Paul

https://ci.debian.net/data/autopkgtest/testing/amd64/d/dask.distributed/4801510/log.gz

=================================== FAILURES
===================================
_____________________ test_robust_to_bad_sizeof_estimates
______________________

    def test_func():
        result = None
        workers = []
        with clean(timeout=active_rpc_timeout, **clean_kwargs) as loop:

            async def coro():
                with dask.config.set(config):
                    s = False
                    for i in range(5):
                        try:
                            s, ws = await start_cluster(
                                nthreads,
                                scheduler,
                                loop,
                                security=security,
                                Worker=Worker,
                                scheduler_kwargs=scheduler_kwargs,
                                worker_kwargs=worker_kwargs,
                            )
                        except Exception as e:
                            logger.error(
                                "Failed to start gen_cluster, retrying",
                                exc_info=True,
                            )
                        else:
                            workers[:] = ws
                            args = [s] + workers
                            break
                    if s is False:
                        raise Exception("Could not start cluster")
                    if client:
                        c = await Client(
                            s.address,
                            loop=loop,
                            security=security,
                            asynchronous=True,
                            **client_kwargs
                        )
                        args = [c] + args
                    try:
                        future = func(*args)
                        if timeout:
                            future = asyncio.wait_for(future, timeout)
                        result = await future
                        if s.validate:
                            s.validate_state()
                    finally:
                        if client and c.status not in ("closing", "closed"):
                            await c._close(fast=s.status == "closed")
                        await end_cluster(s, workers)
                        await asyncio.wait_for(cleanup_global_workers(), 1)

                    try:
                        c = await default_client()
                    except ValueError:
                        pass
                    else:
                        await c._close(fast=True)

                    for i in range(5):
                        if all(c.closed() for c in Comm._instances):
                            break
                        else:
                            await asyncio.sleep(0.05)
                    else:
                        L = [c for c in Comm._instances if not c.closed()]
                        Comm._instances.clear()
                        # raise ValueError("Unclosed Comms", L)
                        print("Unclosed Comms", L)

                    return result

            result = loop.run_sync(
>               coro, timeout=timeout * 2 if timeout else timeout
            )

/usr/lib/python3/dist-packages/distributed/utils_test.py:957:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _ _ _
/usr/lib/python3/dist-packages/tornado/ioloop.py:576: in run_sync
    return future_cell[0].result()
/usr/lib/python3/dist-packages/distributed/utils_test.py:927: in coro
    result = await future
/usr/lib/python3.7/asyncio/tasks.py:442: in wait_for
    return fut.result()
/usr/lib/python3/dist-packages/tornado/gen.py:1162: in run
    yielded = self.gen.send(value)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _ _ _

c = <Client: not connected>
s = <Scheduler: "tcp://127.0.0.1:33643" processes: 0 cores: 0>
a = <Worker: 'tcp://127.0.0.1:40861', 0, closed, stored: 0, running:
0/1, ready: 0, comm: 0, waiting: 0>

    @gen_cluster(
        nthreads=[("127.0.0.1", 1)],
        client=True,
        worker_kwargs={"memory_monitor_interval": 10},
    )
    def test_robust_to_bad_sizeof_estimates(c, s, a):
        np = pytest.importorskip("numpy")
        memory = psutil.Process().memory_info().rss
        a.memory_limit = memory / 0.7 + 400e6

        class BadAccounting(object):
            def __init__(self, data):
                self.data = data

            def __sizeof__(self):
                return 10

        def f(n):
            x = np.ones(int(n), dtype="u1")
            result = BadAccounting(x)
            return result

        futures = c.map(f, [100e6] * 8, pure=False)

        start = time()
        while not a.data.disk:
            yield gen.sleep(0.1)
>           assert time() < start + 5
E           assert 1585911987.756759 < (1585911982.712976 + 5)
E            +  where 1585911987.756759 = time()

/usr/lib/python3/dist-packages/distributed/tests/test_worker.py:1121:
AssertionError

-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 488 bytes
Desc: OpenPGP digital signature
URL: <http://alioth-lists.debian.net/pipermail/python-modules-team/attachments/20200404/d122939c/attachment.sig>


More information about the Python-modules-team mailing list