sources for test_pool.py [rev. 38799]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import py
import sys
WorkerPool = py._thread.WorkerPool
ThreadOut = py._thread.ThreadOut
def test_some():
    pool = WorkerPool()
    q = py.std.Queue.Queue()
    num = 4
    def f(i): 
        q.put(i) 
        while q.qsize(): 
            py.std.time.sleep(0.01) 
    for i in range(num):
        pool.dispatch(f, i) 
    for i in range(num):
        q.get()
    assert len(pool._alive) == 4
    pool.shutdown()
    # XXX I replaced the following join() with a time.sleep(1), which seems
    # to fix the test on Windows, and doesn't break it on Linux... Completely
    # unsure what the idea is, though, so it would be nice if someone with some
    # more understanding of what happens here would either fix this better, or
    # remove this comment...
    # pool.join(timeout=1.0)
    py.std.time.sleep(1)
    assert len(pool._alive) == 0
    assert len(pool._ready) == 0
def test_get():
    pool = WorkerPool()
    def f(): 
        return 42
    reply = pool.dispatch(f) 
    result = reply.get() 
    assert result == 42 
def test_get_timeout():
    pool = WorkerPool()
    def f(): 
        py.std.time.sleep(0.2) 
        return 42
    reply = pool.dispatch(f) 
    py.test.raises(IOError, "reply.get(timeout=0.01)") 
def test_get_excinfo():
    pool = WorkerPool()
    def f(): 
        raise ValueError("42") 
    reply = pool.dispatch(f) 
    excinfo = py.test.raises(ValueError, "reply.get(1.0)") 
    py.test.raises(EOFError, "reply.get(1.0)") 
def test_maxthreads():
    pool = WorkerPool(maxthreads=1)
    def f():
        py.std.time.sleep(0.5)
    try:
        pool.dispatch(f)
        py.test.raises(IOError, pool.dispatch, f)
    finally:
        pool.shutdown()
def test_join_timeout():
    pool = WorkerPool()
    q = py.std.Queue.Queue()
    def f():
        q.get() 
    reply = pool.dispatch(f)
    pool.shutdown()
    py.test.raises(IOError, pool.join, 0.01)
    q.put(None)
    reply.get(timeout=1.0) 
    pool.join(timeout=0.1) 
def test_pool_clean_shutdown():
    capture = py.io.StdCaptureFD() 
    pool = WorkerPool()
    def f():
        pass
    pool.dispatch(f)
    pool.dispatch(f)
    pool.shutdown()
    pool.join(timeout=1.0)
    assert not pool._alive
    assert not pool._ready
    out, err = capture.reset()
    print out
    print >>sys.stderr, err
    assert err == ''