]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.lz4/src/net/jpountz/lz4/LZ4BlockOutputStream.java
Merge "(refs #7607) Async module for running functions asynchronously"
[simantics/platform.git] / bundles / org.simantics.lz4 / src / net / jpountz / lz4 / LZ4BlockOutputStream.java
1 package net.jpountz.lz4;
2
3 /*
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 import java.io.FilterOutputStream;
18 import java.io.IOException;
19 import java.io.OutputStream;
20 import java.util.zip.Checksum;
21
22 import net.jpountz.util.SafeUtils;
23 import net.jpountz.xxhash.StreamingXXHash32;
24 import net.jpountz.xxhash.XXHashFactory;
25
26 /**
27  * Streaming LZ4.
28  * <p>
29  * This class compresses data into fixed-size blocks of compressed data.
30  * @see LZ4BlockInputStream
31  */
32 public final class LZ4BlockOutputStream extends FilterOutputStream {
33
34   static final byte[] MAGIC = new byte[] { 'L', 'Z', '4', 'B', 'l', 'o', 'c', 'k' };
35   static final int MAGIC_LENGTH = MAGIC.length;
36
37   static final int HEADER_LENGTH =
38       MAGIC_LENGTH // magic bytes
39       + 1          // token
40       + 4          // compressed length
41       + 4          // decompressed length
42       + 4;         // checksum
43
44   static final int COMPRESSION_LEVEL_BASE = 10;
45   static final int MIN_BLOCK_SIZE = 64;
46   static final int MAX_BLOCK_SIZE = 1 << (COMPRESSION_LEVEL_BASE + 0x0F);
47
48   static final int COMPRESSION_METHOD_RAW = 0x10;
49   static final int COMPRESSION_METHOD_LZ4 = 0x20;
50
51   static final int DEFAULT_SEED = 0x9747b28c;
52
53   private static int compressionLevel(int blockSize) {
54     if (blockSize < MIN_BLOCK_SIZE) {
55       throw new IllegalArgumentException("blockSize must be >= " + MIN_BLOCK_SIZE + ", got " + blockSize);
56     } else if (blockSize > MAX_BLOCK_SIZE) {
57       throw new IllegalArgumentException("blockSize must be <= " + MAX_BLOCK_SIZE + ", got " + blockSize);
58     }
59     int compressionLevel = 32 - Integer.numberOfLeadingZeros(blockSize - 1); // ceil of log2
60     assert (1 << compressionLevel) >= blockSize;
61     assert blockSize * 2 > (1 << compressionLevel);
62     compressionLevel = Math.max(0, compressionLevel - COMPRESSION_LEVEL_BASE);
63     assert compressionLevel >= 0 && compressionLevel <= 0x0F;
64     return compressionLevel;
65   }
66
67   private final int blockSize;
68   private final int compressionLevel;
69   private final LZ4Compressor compressor;
70   private final Checksum checksum;
71   private final byte[] buffer;
72   private final byte[] compressedBuffer;
73   private final boolean syncFlush;
74   private boolean finished;
75   private int o;
76
77   /**
78    * Create a new {@link OutputStream} with configurable block size. Large
79    * blocks require more memory at compression and decompression time but
80    * should improve the compression ratio.
81    *
82    * @param out         the {@link OutputStream} to feed
83    * @param blockSize   the maximum number of bytes to try to compress at once,
84    *                    must be >= 64 and <= 32 M
85    * @param compressor  the {@link LZ4Compressor} instance to use to compress
86    *                    data
87    * @param checksum    the {@link Checksum} instance to use to check data for
88    *                    integrity.
89    * @param syncFlush   true if pending data should also be flushed on {@link #flush()}
90    */
91   public LZ4BlockOutputStream(OutputStream out, int blockSize, LZ4Compressor compressor, Checksum checksum, boolean syncFlush) {
92     super(out);
93     this.blockSize = blockSize;
94     this.compressor = compressor;
95     this.checksum = checksum;
96     this.compressionLevel = compressionLevel(blockSize);
97     this.buffer = new byte[blockSize];
98     final int compressedBlockSize = HEADER_LENGTH + compressor.maxCompressedLength(blockSize);
99     this.compressedBuffer = new byte[compressedBlockSize];
100     this.syncFlush = syncFlush;
101     o = 0;
102     finished = false;
103     System.arraycopy(MAGIC, 0, compressedBuffer, 0, MAGIC_LENGTH);
104   }
105
106   /**
107    * Create a new instance which checks stream integrity using
108    * {@link StreamingXXHash32} and doesn't sync flush.
109    * @see #LZ4BlockOutputStream(OutputStream, int, LZ4Compressor, Checksum, boolean)
110    * @see StreamingXXHash32#asChecksum()
111    */
112   public LZ4BlockOutputStream(OutputStream out, int blockSize, LZ4Compressor compressor) {
113     this(out, blockSize, compressor, XXHashFactory.fastestInstance().newStreamingHash32(DEFAULT_SEED).asChecksum(), false);
114   }
115
116   /**
117    * Create a new instance which compresses with the standard LZ4 compression
118    * algorithm.
119    * @see #LZ4BlockOutputStream(OutputStream, int, LZ4Compressor)
120    * @see LZ4Factory#fastCompressor()
121    */
122   public LZ4BlockOutputStream(OutputStream out, int blockSize) {
123     this(out, blockSize, LZ4Factory.fastestInstance().fastCompressor());
124   }
125
126   /**
127    * Create a new instance which compresses into blocks of 64 KB.
128    * @see #LZ4BlockOutputStream(OutputStream, int)
129    */
130   public LZ4BlockOutputStream(OutputStream out) {
131     this(out, 1 << 16);
132   }
133
134   private void ensureNotFinished() {
135     if (finished) {
136       throw new IllegalStateException("This stream is already closed");
137     }
138   }
139
140   @Override
141   public void write(int b) throws IOException {
142     ensureNotFinished();
143     if (o == blockSize) {
144       flushBufferedData();
145     }
146     buffer[o++] = (byte) b;
147   }
148
149   @Override
150   public void write(byte[] b, int off, int len) throws IOException {
151     SafeUtils.checkRange(b, off, len);
152     ensureNotFinished();
153
154     while (o + len > blockSize) {
155       final int l = blockSize - o;
156       System.arraycopy(b, off, buffer, o, blockSize - o);
157       o = blockSize;
158       flushBufferedData();
159       off += l;
160       len -= l;
161     }
162     System.arraycopy(b, off, buffer, o, len);
163     o += len;
164   }
165
166   @Override
167   public void write(byte[] b) throws IOException {
168     ensureNotFinished();
169     write(b, 0, b.length);
170   }
171
172   @Override
173   public void close() throws IOException {
174     if (!finished) {
175       finish();
176     }
177     if (out != null) {
178       out.close();
179       out = null;
180     }
181   }
182
183   private void flushBufferedData() throws IOException {
184     if (o == 0) {
185       return;
186     }
187     checksum.reset();
188     checksum.update(buffer, 0, o);
189     final int check = (int) checksum.getValue();
190     int compressedLength = compressor.compress(buffer, 0, o, compressedBuffer, HEADER_LENGTH);
191     final int compressMethod;
192     if (compressedLength >= o) {
193       compressMethod = COMPRESSION_METHOD_RAW;
194       compressedLength = o;
195       System.arraycopy(buffer, 0, compressedBuffer, HEADER_LENGTH, o);
196     } else {
197       compressMethod = COMPRESSION_METHOD_LZ4;
198     }
199
200     compressedBuffer[MAGIC_LENGTH] = (byte) (compressMethod | compressionLevel);
201     writeIntLE(compressedLength, compressedBuffer, MAGIC_LENGTH + 1);
202     writeIntLE(o, compressedBuffer, MAGIC_LENGTH + 5);
203     writeIntLE(check, compressedBuffer, MAGIC_LENGTH + 9);
204     assert MAGIC_LENGTH + 13 == HEADER_LENGTH;
205     out.write(compressedBuffer, 0, HEADER_LENGTH + compressedLength);
206     o = 0;
207   }
208
209   /**
210    * Flush this compressed {@link OutputStream}.
211    *
212    * If the stream has been created with <code>syncFlush=true</code>, pending
213    * data will be compressed and appended to the underlying {@link OutputStream}
214    * before calling {@link OutputStream#flush()} on the underlying stream.
215    * Otherwise, this method just flushes the underlying stream, so pending
216    * data might not be available for reading until {@link #finish()} or
217    * {@link #close()} is called.
218    */
219   @Override
220   public void flush() throws IOException {
221     if (out != null) {
222       if (syncFlush) {
223         flushBufferedData();
224       }
225       out.flush();
226     }
227   }
228
229   /**
230    * Same as {@link #close()} except that it doesn't close the underlying stream.
231    * This can be useful if you want to keep on using the underlying stream.
232    */
233   public void finish() throws IOException {
234     ensureNotFinished();
235     flushBufferedData();
236     compressedBuffer[MAGIC_LENGTH] = (byte) (COMPRESSION_METHOD_RAW | compressionLevel);
237     writeIntLE(0, compressedBuffer, MAGIC_LENGTH + 1);
238     writeIntLE(0, compressedBuffer, MAGIC_LENGTH + 5);
239     writeIntLE(0, compressedBuffer, MAGIC_LENGTH + 9);
240     assert MAGIC_LENGTH + 13 == HEADER_LENGTH;
241     out.write(compressedBuffer, 0, HEADER_LENGTH);
242     finished = true;
243     out.flush();
244   }
245
246   private static void writeIntLE(int i, byte[] buf, int off) {
247     buf[off++] = (byte) i;
248     buf[off++] = (byte) (i >>> 8);
249     buf[off++] = (byte) (i >>> 16);
250     buf[off++] = (byte) (i >>> 24);
251   }
252
253   @Override
254   public String toString() {
255     return getClass().getSimpleName() + "(out=" + out + ", blockSize=" + blockSize
256         + ", compressor=" + compressor + ", checksum=" + checksum + ")";
257   }
258
259 }