sources for test_greenlet.py [rev. 38799]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import py
try:
    from py.magic import greenlet
except (ImportError, RuntimeError), e:
    py.test.skip(str(e))
import sys, gc
from py.test import raises
try:
    import thread, threading
except ImportError:
    thread = None
def test_simple():
    lst = []
    def f():
        lst.append(1)
        greenlet.getcurrent().parent.switch()
        lst.append(3)
    g = greenlet(f)
    lst.append(0)
    g.switch()
    lst.append(2)
    g.switch()
    lst.append(4)
    assert lst == range(5)
def test_threads():
    if not thread:
        py.test.skip("this is a test about thread")
    success = []
    def f():
        test_simple()
        success.append(True)
    ths = [threading.Thread(target=f) for i in range(10)]
    for th in ths:
        th.start()
    for th in ths:
        th.join()
    assert len(success) == len(ths)
class SomeError(Exception):
    pass
def fmain(seen):
    try:
        greenlet.getcurrent().parent.switch()
    except:
        seen.append(sys.exc_info()[0])
        raise
    raise SomeError
def test_exception():
    seen = []
    g1 = greenlet(fmain)
    g2 = greenlet(fmain)
    g1.switch(seen)
    g2.switch(seen)
    g2.parent = g1
    assert seen == []
    raises(SomeError, g2.switch)
    assert seen == [SomeError]
    g2.switch()
    assert seen == [SomeError]
def send_exception(g, exc):
    # note: send_exception(g, exc)  can be now done with  g.throw(exc).
    # the purpose of this test is to explicitely check the propagation rules.
    def crasher(exc):
        raise exc
    g1 = greenlet(crasher, parent=g)
    g1.switch(exc)
def test_send_exception():
    seen = []
    g1 = greenlet(fmain)
    g1.switch(seen)
    raises(KeyError, "send_exception(g1, KeyError)")
    assert seen == [KeyError]
def test_dealloc():
    seen = []
    g1 = greenlet(fmain)
    g2 = greenlet(fmain)
    g1.switch(seen)
    g2.switch(seen)
    assert seen == []
    del g1
    gc.collect()
    assert seen == [greenlet.GreenletExit]
    del g2
    gc.collect()
    assert seen == [greenlet.GreenletExit, greenlet.GreenletExit]
def test_dealloc_other_thread():
    if not thread:
        py.test.skip("this is a test about thread")
    seen = []
    someref = []
    lock = thread.allocate_lock()
    lock.acquire()
    lock2 = thread.allocate_lock()
    lock2.acquire()
    def f():
        g1 = greenlet(fmain)
        g1.switch(seen)
        someref.append(g1)
        del g1
        gc.collect()
        lock.release()
        lock2.acquire()
        greenlet()   # trigger release
        lock.release()
        lock2.acquire()
    t = threading.Thread(target=f)
    t.start()
    lock.acquire()
    assert seen == []
    assert len(someref) == 1
    del someref[:]
    gc.collect()
    # g1 is not released immediately because it's from another thread
    assert seen == []
    lock2.release()
    lock.acquire()
    assert seen == [greenlet.GreenletExit]
    lock2.release()
    t.join()
def test_frame():
    def f1():
        f = sys._getframe(0)
	assert f.f_back is None
	greenlet.getcurrent().parent.switch(f)
	return "meaning of life"
    g = greenlet(f1)
    frame = g.switch()
    assert frame is g.gr_frame
    assert g
    next = g.switch()
    assert not g
    assert next == "meaning of life"
    assert g.gr_frame is None
def test_thread_bug():
    if not thread:
        py.test.skip("this is a test about thread")
    import time
    def runner(x):
        g = greenlet(lambda: time.sleep(x))
        g.switch()
    t1 = threading.Thread(target=runner, args=(0.2,))
    t2 = threading.Thread(target=runner, args=(0.3,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()