1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 __version__ = "$Rev$"
19
20 import time
21
22 from flumotion.common import log
23
24 from twisted.internet import reactor
25
26 from flumotion.component.plugs import base as plugbase
27
28
37
38
40
42 props = args['properties']
43 self._rateBytesPerSec = int(props.get('rate', 128000) / 8)
44
45
46 self._maxLevel = int(props.get('max-level',
47 self._rateBytesPerSec * 8 * 10) / 8)
48 self._initialLevel = int(props.get('initial-level', 0) / 8)
49
53
54
56 """
57 Use a token bucket to proxy between a producer (e.g. FileTransfer) and a
58 consumer (TCP protocol, etc.), doing rate control.
59
60 The bucket has a rate and a maximum level, so a small burst can be
61 permitted. The initial level can be set to a non-zero value, this is
62 useful to implement burst-on-connect behaviour.
63
64 TODO: This almost certainly only works with producers that work like
65 FileTransfer - i.e. they produce data directly in resumeProducing, and
66 ignore pauseProducing. This is sufficient for our needs right now.
67 """
68
69 logCategory = 'token-bucket'
70
71
72
73
74
75
76 _dripInterval = 1.0
77
78
79
80 - def __init__(self, consumer, maxLevel, fillRate, fillLevel=0):
81 self.maxLevel = maxLevel
82 self.fillRate = fillRate
83 self.fillLevel = fillLevel
84
85 self._buffers = []
86 self._buffersSize = 0
87
88 self._finishing = False
89
90
91 self._unregister = False
92
93
94 self._lastDrip = time.time()
95 self._dripDC = None
96 self._paused = True
97
98 self.producer = None
99 self.consumer = consumer
100
101
102
103
104
105
106
107 self.consumer.registerProducer(self, 1)
108
109 self.info("Created TokenBucketConsumer with rate %d, "
110 "initial level %d, maximum level %d",
111 fillRate, fillLevel, maxLevel)
112
114 """
115 Re-fill our token bucket based on how long it has been since we last
116 refilled it.
117 Then attempt to write some data.
118 """
119 self._dripDC = None
120
121 now = time.time()
122 elapsed = now - self._lastDrip
123 self._lastDrip = now
124
125 newBytes = self.fillRate * elapsed
126
127
128
129 self.fillLevel = int(min(self.fillLevel + newBytes, self.maxLevel))
130
131 self._tryWrite()
132
134 if not self.consumer:
135 return
136
137 while self.fillLevel > 0 and self._buffersSize > 0:
138
139 offset, buf = self._buffers[0]
140 sendbuf = buf[offset:offset+self.fillLevel]
141 sendBytes = len(sendbuf)
142
143 if sendBytes + offset == len(buf):
144 self._buffers.pop(0)
145 else:
146 self._buffers[0] = (offset + sendBytes, buf)
147 self._buffersSize -= sendBytes
148
149 self.consumer.write(sendbuf)
150 self.fillLevel -= sendBytes
151
152 if self._buffersSize > 0:
153
154
155
156 if not (self._dripDC or self._paused):
157 self._dripDC = reactor.callLater(self._dripInterval,
158 self._dripAndTryWrite)
159 else:
160
161 if self._finishing:
162 if self._unregister:
163 self._doUnregister()
164 self._doFinish()
165 elif self.producer:
166 self.producer.resumeProducing()
167 elif self._unregister:
168 self._doUnregister()
169
173
178
180 self.debug('stopProducing; buffered data: %d', self._buffersSize)
181 if self.producer is not None:
182 self.producer.stopProducing()
183
184 if self._dripDC:
185
186 self._dripDC.cancel()
187 self._dripDC = None
188
189
190 if self._unregister:
191 self._doUnregister()
192
193 if self._finishing:
194 self._finishing = False
195 self.consumer.finish()
196
197 if self._buffersSize > 0:
198
199 self._buffers = []
200 self._buffersSize = 0
201
202 self.consumer = None
203
205 self._paused = True
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229 if self.producer:
230 self.producer.pauseProducing()
231
232
233
234 if self._dripDC:
235 self._dripDC.cancel()
236 self._dripDC = None
237
239 self._paused = False
240 self._tryWrite()
241
242 if not self._buffers and self.producer:
243 self.producer.resumeProducing()
244
246 self._buffers.append((0, data))
247 self._buffersSize += len(data)
248
249 self._tryWrite()
250
251 if self._buffers and not self.fillLevel and self.producer:
252
253
254
255 self.producer.pauseProducing()
256
258 if self._dripDC:
259 self._finishing = True
260 elif self.consumer:
261 self._doFinish()
262
264 self.debug("Producer registered: %r", producer)
265 self.producer = producer
266
267 self.resumeProducing()
268
270 self.debug('unregisterProducer; buffered data: %d', self._buffersSize)
271 if self.producer is not None:
272 self.producer = None
273
274 if not self._dripDC:
275 self._doUnregister()
276 else:
277
278 self._unregister = True
279