1 """GzipStream & GzipStreamXL Classes
2
3 GzipStream (Python v1.5.2 - v2.2.*):
4 A streaming gzip handler.
5 gzipstream.GzipStream extends the functionality of the gzip.GzipFile class
6 to allow the processing of streaming data.
7 This is done by buffering the stream as it passes through (a seekable
8 object is needed).
9
10 GzipStreamXL (Python v1.5.2/v2.1.* --- ie. not v2.2.*):
11 A streaming gzip handler for very large files.
12
13 _StreamBuf:
14 Allow seeks on socket-like objects -- support GzipStream class.
15 Enables non-seekable file-like objects some flexibility as regards to
16 seeking. It does this via a buffer, a StringIO object. Note, because
17 it is assumed that a socket stream is being manipulated, once the buffer
18 "window" has passed over a data segment, seeking prior to that is not
19 allowed.
20
21 XXX: Eventually, I wish to merge this with the gzip.GzipFile somehow and
22 submit to the python folks.
23
24 Author: Todd Warner <taw@redhat.com>
25 Copyright (c) 2002-2010, Red Hat, Inc.
26 Released under GPLv2 license
27 """
29
30
31
32
33
34
35
36
37
38
39
40
41 import sys
42 import gzip
43 from gzip import zlib
44 from types import IntType, LongType
45 import struct
46 import string
47 try:
48
49 from cStringIO import StringIO
50 except ImportError:
51 from StringIO import StringIO
52
53
54 _DEBUG_YN = 0
55 if _DEBUG_YN:
56 import time
57 try:
58 import thread
59 except:
60 pass
61
62
64 """Return 1 for Python versions 1.5.* and 2.1.*
65 Return 2 for Python versions 2.2+.*
66 """
67 minor = int(string.split(string.split(sys.version)[0], '.')[1])
68 if minor < 2:
69 return 1
70 return 2
71 _SYS_VERSION = __getSysVersion()
72
73
75 """Handle streaming gzipped data
76
77 GzipStream extends the functionality of the gzip.GzipFile class.
78 gzip.GzipFile generally needs a seekable object. This doesn't allow for
79 streaming gzipped data to be processed easily (e.g. can't seek a socket).
80 Using the _StreamBuf class enables streaming gzipped data to be processed
81 by buffering that data at it passes through.
82
83 For Python versions 1.5.2 & 2.1.*:
84 Normal data version.
85 Normally sized data stream version == faster.
86 For very large data streams (2.5GB-ish), use GzipStreamXL.
87 """
88 VERSION = _SYS_VERSION
89
90
91 - def __init__(self, stream=None, mode=None, compresslevel=9):
92 if stream is None:
93 stream = sys.stdout
94
95 mode = self._initModeLogic(stream, mode)
96
97
98 if not isinstance(stream, _StreamBuf):
99 self.stream = _StreamBuf(stream, mode)
100 else:
101 self.stream = stream
102 self._gzip = gzip
103 self._gzip.GzipFile.__init__(self, '', mode, compresslevel, self.stream)
104
106 "attempt to determine the mode"
107 _mode = None
108 _modes = ''
109 if hasattr(stream, 'mode'):
110 _mode = stream.mode
111 _modes = _mode
112
113 if not _mode and hasattr(stream, 'read'):
114 _modes = _modes + 'r'
115 if not _mode and hasattr(stream, 'write'):
116 _modes = _modes + 'w'
117
118
119 if not _mode and not mode:
120
121 if 'r' in _modes:
122 mode = _mode = 'rb'
123 elif 'w' in _modes:
124 mode = _mode = 'wb'
125 elif not mode:
126 mode = _mode
127
128 if mode[0] not in _modes:
129 raise ValueError, 'Mode %s not supported' % mode
130 return mode
131
132 - def _read(self, size=1024):
133
134
135
136
137 if self.stream is None:
138 raise EOFError, "Reached EOF"
139
140 if self._new_member:
141
142
143
144
145 pos = self.stream.tell()
146 self.stream.seek(pos+1)
147
148 if pos == self.stream.tell():
149 self.stream.close()
150 self.stream = None
151 return EOFError, "Reached EOF"
152 else:
153 self.stream.seek( pos )
154
155 self._init_read()
156 self._read_gzip_header()
157 self.decompress = zlib.decompressobj(-zlib.MAX_WBITS)
158 self._new_member = 0
159
160
161 buf = self.stream.read(size)
162
163
164
165
166 if buf == "":
167 uncompress = self.decompress.flush()
168 self._read_eof()
169 self.stream.close()
170 self.stream = None
171 self._add_read_data( uncompress )
172 raise EOFError, 'Reached EOF'
173
174 uncompress = self.decompress.decompress(buf)
175 self._add_read_data( uncompress )
176
177 if self.decompress.unused_data != "":
178
179
180
181
182
183 self.stream.seek( -len(self.decompress.unused_data)+8, 1)
184
185
186
187 self._read_eof()
188 self._new_member = 1
189
190 - def seek(self, offset):
191 raise IOError, 'Random access not allowed in gzip streams'
192
194 ret = ''
195 if self.stream._closedYN:
196 ret = "<closed gzipstream.GzipStream instance, mode '%s' at %s>" % \
197 (self.stream.mode, id(self))
198 else:
199 ret = "<open gzipstream.GzipStream instance, mode '%s' at %s>" % \
200 (self.stream.mode, id(self))
201 return ret
202
203
205
206 if type(self.size) == LongType:
207 self._gzip.read32 = self._read32XL
208 self._gzip.GzipFile._read_eof(self)
209
211 if self.stream and self.stream._closedYN:
212
213 return
214
215 if hasattr(self, 'size'):
216 if type(self.size) == LongType:
217 self._gzip.write32 = self._gzip.write32u
218 else:
219
220 self._gzip.write32 = self._gzip.write32u
221 self._gzip.GzipFile.close(self)
222 if self.stream:
223 self.stream.close()
224
226 """Allow for very large files/streams to be processed.
227 Slows things down, but...
228
229 Used by Python v2.2.*.
230 Also used by Python v1.5.2/v2.1.* in inheriting class GzipStreamXL.
231 """
232 return struct.unpack("<L", input.read(4))[0]
233
234
235
236
237
238 if _SYS_VERSION == 1:
240 """Handle streaming gzipped data -- large data version.
241
242 Very large sized data stream version -- slooower.
243 For normally sized data streams (< 2.5GB-ish), use GzipStream.
244 """
245 - def __init__(self, stream=None, mode=None, compresslevel=9):
249
250
252 """Make size long in order to support very large files.
253 """
254 GzipStream._init_write(self, filename)
255 self.size = 0L
256
257
259 """Make size a long in order to support very large files.
260 """
261 GzipStream._init_read(self)
262 self.size = 0L
263
264
266 """Stream buffer for file-like objects.
267
268 Allow seeks on socket-like objects.
269 Enables non-seekable file-like objects some flexibility as regards to
270 seeking. It does this via a buffer, a StringIO object. Note, because
271 it is assumed that a socket stream is being manipulated, once the buffer
272 "window" has passed over a data segment, seeking prior to that is not
273 allowed.
274 XXX: probably reinventing the wheel.
275 """
276 __MIN_READ_SIZE = 1024 * 2
277 __MAX_BUFIO_SIZE = __MIN_READ_SIZE * 10
278 __ABS_MAX_BUFIO_SIZE = __MAX_BUFIO_SIZE * 2
279
280
281 __INT_CHECK_SIZE = sys.maxint - __ABS_MAX_BUFIO_SIZE -2
282
283 VERSION = _SYS_VERSION
284
285
286 - def __init__(self, stream=None, mode=None):
287 """Constructor.
288 stream: an open file-like object.
289 """
290 self.fo = stream
291 self._readableYN = 0
292 self._writableYN = 0
293
294 if self.fo is None:
295 self.fo = StringIO()
296 mode = 'wb'
297 self._readableYN = 1
298 self._writableYN = 1
299
300
301 if mode is None:
302 try:
303 mode = self.fo.mode
304 except:
305 pass
306
307
308 if not mode or (type(mode) == type("") \
309 and (mode[0] not in 'rw' or (len(mode) > 1 and mode[1] != 'b'))):
310 raise IOError, (22, "Invalid argument: mode=%s" % repr(mode))
311
312 if mode[0] == 'r':
313 self._readableYN = 1
314 else:
315 self._writableYN = 1
316
317
318 if self._readableYN:
319 self.fo.read
320 if self._writableYN:
321 self.fo.write
322
323 self._closedYN = 0
324 self._currFoPos = 0
325 self._bufIO = StringIO()
326 self._lenBufIO = 0
327 self.mode = mode
328
329 self.__mutexOnYN = 0
330 if _DEBUG_YN and globals().has_key('thread'):
331 thread.start_new(self.__debugThread, ())
332
334 "Destructor"
335
336
337 try:
338 self.close()
339 except:
340 pass
341
343 if self._closedYN:
344 raise ValueError, "I/O operation on closed _StreamBuf object"
345 return 0
346
348 """A buffered read --- refactored.
349 """
350 if self._closedYN:
351 raise ValueError, "I/O operation on closed _StreamBuf object"
352 if not self._readableYN:
353 raise IOError, (9, "Can't read from a write only object")
354 tell = self._bufIO.tell()
355 bufIO = self._bufIO.read(size)
356 lbufIO = len(bufIO)
357 bufFo = ''
358 lbufFo = 0
359 if lbufIO < size:
360
361 buf = self.fo.read(_StreamBuf.__MIN_READ_SIZE)
362 bufFo = buf
363 lbufFo = len(bufFo)
364 while buf and lbufFo + lbufIO < size:
365 buf = self.fo.read(_StreamBuf.__MIN_READ_SIZE)
366 bufFo = '%s%s' % (bufFo, buf)
367 lbufFo = len(bufFo)
368 self._bufIO.write(bufFo)
369 self.__mutexOnYN = 1
370 self._lenBufIO = self._lenBufIO + lbufFo
371 self.__mutexOnYN = 0
372 if lbufIO + lbufFo < size:
373 size = lbufIO + lbufFo
374 self._bufIO.seek(tell + size)
375 if _StreamBuf.VERSION == 1:
376 self._currFoPos = self.__checkInt(self._currFoPos)
377 self._currFoPos = self._currFoPos + size
378 bufFo = bufFo[:size-lbufIO]
379 self._refactorBufIO()
380 return '%s%s' % (bufIO, bufFo)
381
382 - def read(self, size=None):
383 """A buffered read.
384 """
385 if size and size < 0:
386 raise IOError, (22, "Invalid argument")
387 if not self._readableYN:
388 raise IOError, (9, "Can't read from a write only object")
389 fetchSize = _StreamBuf.__MAX_BUFIO_SIZE
390 if size:
391 fetchSize = min(fetchSize, size)
392 buf = self._read(fetchSize)
393 bufOut = buf
394 accumSize = len(buf)
395 while buf:
396 if size and accumSize >= size:
397 break
398 buf = self._read(fetchSize)
399 bufOut = '%s%s' % (bufOut, buf)
400 if _StreamBuf.VERSION == 1:
401 accumSize = self.__checkInt(accumSize)
402 accumSize = accumSize + len(buf)
403 return bufOut
404
406 """Return one line of text: a string ending in a '\n' or EOF.
407 """
408 if self._closedYN:
409 raise ValueError, "I/O operation on closed _StreamBuf object"
410 if not self._readableYN:
411 raise IOError, (9, "Can't read from a write only object")
412 line = ''
413 buf = self.read(_StreamBuf.__MIN_READ_SIZE)
414 while buf:
415 i = string.find(buf, '\n')
416 if i >= 0:
417 i = i + 1
418 self._bufIO.seek(-(len(buf)-i), 1)
419 buf = buf[:i]
420 line = '%s%s' % (line, buf)
421 break
422 line = '%s%s' % (line, buf)
423 buf = self.read(_StreamBuf.__MIN_READ_SIZE)
424 return line
425
427 """Read entire file into memory! And return a list of lines of text.
428 """
429 if self._closedYN:
430 raise ValueError, "I/O operation on closed _StreamBuf object"
431 if not self._readableYN:
432 raise IOError, (9, "Can't read from a write only object")
433 lines = []
434 line = self.readline()
435 while line:
436 lines.append(line)
437 line = self.readline()
438 return lines
439
465
467 """Debug code.
468 """
469 err = sys.stderr.write
470 err('self._lenBufIO: %s/%s\n' % (self._lenBufIO,
471 len(self._bufIO.getvalue())))
472 err('self._currFoPos: %s\n' % self._currFoPos)
473 err('self._readableYN: %s\n' % self._readableYN)
474 err('self._writableYN: %s\n' % self._writableYN)
475 err('self._closedYN: %s\n' % self._closedYN)
476
478 """Write string to stream.
479 """
480 if self._closedYN:
481 raise ValueError, "I/O operation on closed _StreamBuf object"
482 if not self._writableYN:
483 raise IOError, (9, "Can't write to a read only object")
484 self._bufIO.write(s)
485 if _StreamBuf.VERSION == 1:
486 self._currFoPos = self.__checkInt(self._currFoPos)
487 self._currFoPos = self._currFoPos + len(s)
488 self.__mutexOnYN = 1
489 self._lenBufIO = self._lenBufIO + len(s)
490 self.__mutexOnYN = 0
491 self.fo.write(self._refactorBufIO())
492
494 """Given list, concatenate and write.
495 """
496 if self._closedYN:
497 raise ValueError, "I/O operation on closed _StreamBuf object"
498 if not self._writableYN:
499 raise IOError, (9, "Can't write to a read only object")
500 for s in l:
501 self.write(s)
502
503 - def seek(self, offset, where=0):
504 """A limited seek method. See class __doc__ for more details.
505 """
506 if self._closedYN:
507 raise ValueError, "I/O operation on closed _StreamBuf object"
508
509 tell = self._bufIO.tell()
510 beginBuf = self._currFoPos - tell
511 endBuf = self._lenBufIO + beginBuf - 1
512
513
514 if not where:
515 pass
516
517 elif where == 1:
518 if _StreamBuf.VERSION == 1:
519 offset = self.__checkInt(offset)
520 offset = self._currFoPos + offset
521
522 elif where == 2:
523 if self._readableYN:
524 if offset < 0 and offset < _StreamBuf.__ABS_MAX_BUFIO_SIZE:
525 raise IOError, (22, "Invalid argument; can't determine %s "
526 "position due to unknown stream length" % offset)
527
528 while self.read(_StreamBuf.__MAX_BUFIO_SIZE):
529 pass
530 self._currFoPos = self._currFoPos + offset
531 self._bufIO.seek(offset, 2)
532 return
533 elif self._writableYN:
534 offset = endBuf + offset
535 else:
536 raise IOError, (22, "Invalid argument")
537 if self._writableYN and offset > endBuf:
538 offset = endBuf
539
540
541
542 if offset < 0:
543 raise IOError, (22, "Invalid argument")
544 delta = offset - self._currFoPos
545
546 if offset < beginBuf:
547 raise IOError, (22, "Invalid argument; attempted seek before "
548 "beginning of buffer")
549
550 elif offset > endBuf:
551 if self._readableYN:
552 while delta:
553 x = min(_StreamBuf.__MAX_BUFIO_SIZE, delta)
554 self.read(x)
555 delta = delta - x
556
557 else:
558 self._bufIO.seek(tell + delta, 0)
559 if _StreamBuf.VERSION == 1:
560 self._currFoPos = self.__checkInt(self._currFoPos)
561 self._currFoPos = self._currFoPos + self._bufIO.tell() - tell
562
564 """Return current position in the file-like object.
565 """
566 return self._currFoPos
567
569 """Flush the buffer.
570 NOTE: fileobject is NOT closed, just flushed. Mapping as closely as
571 possible to GzipFile.
572 """
573 self.flush()
574 self._closedYN = 1
575
577 """Flush the buffer.
578 """
579 if self._closedYN:
580 raise ValueError, "I/O operation on closed _StreamBuf object"
581 if self._readableYN:
582 pass
583 if self._writableYN:
584 self.fo.write(self._refactorBufIO(1))
585 if _StreamBuf.VERSION == 1:
586
587
588 try:
589 self.fo.flush()
590 except AttributeError:
591 pass
592 return
593 self.fo.flush()
594
596 ret = ''
597 if self._closedYN:
598 ret = "<closed gzipstream._StreamBuf instance, mode '%s' at %s>" % \
599 (self.mode, id(self))
600 else:
601 ret = "<open gzipstream._StreamBuf instance, mode '%s' at %s>" % \
602 (self.mode, id(self))
603 return ret
604
605
606
608 """Might be faster just to declare them longs.
609 Python versions 1.5.2 & 2.1.* ONLY!
610 """
611 if i > _StreamBuf.__INT_CHECK_SIZE and type(i) == IntType:
612 i = long(i)
613 return i
614
616 """XXX: Only used for debugging. Runs a thread that watches some
617 tell-tale warning flags that something bad is happening.
618 """
619 while not self._closedYN and not self.__mutexOnYN:
620 if self._lenBufIO != len(self._bufIO.getvalue()):
621 sys.stderr.write('XXX: ERROR! _lenBufIO != len(...): %s != %s\n'
622 % (self._lenBufIO, len(self._bufIO.getvalue())))
623 sys.stderr.write('XXX: %s\n' % repr(self))
624 if self._lenBufIO > _StreamBuf.__ABS_MAX_BUFIO_SIZE*2:
625 sys.stderr.write('XXX: ERROR! StringIO buffer WAY to big: %s\n'
626 % self._lenBufIO)
627 sys.stderr.write('XXX: %s\n' % repr(self))
628 time.sleep(1)
629
630
631