1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
15 import java.io.IOException;
16 import java.io.RandomAccessFile;
17 import java.nio.ByteBuffer;
18 import java.nio.channels.FileChannel;
19 import java.util.Arrays;
20 import java.util.Collection;
21 import java.util.Collections;
22 import java.util.Iterator;
24 import org.simantics.db.ChangeSet;
25 import org.simantics.db.Resource;
26 import org.simantics.db.common.internal.config.InternalClientConfig;
27 import org.simantics.db.exception.RuntimeDatabaseException;
28 import org.simantics.db.impl.ResourceImpl;
31 * @author Antti Villberg
32 * @author Tuukka Lehtonen
34 public final class ClientChangesImpl implements ChangeSet {
36 private static final boolean DEBUG = false;
38 private static final int BUFFER_SIZE = 1 << 17;
40 private static final byte CLAIM = 1;
41 private static final byte DENY = 2;
42 private static final byte VALUE = 3;
43 private static final byte TERM = 4;
44 private static final byte INVALIDATE = 5;
46 private SessionImplSocket session;
49 * True if enough data has been written to overflow the normal buffer which
50 * will result in data being written to disk.
52 private boolean overflow = false;
54 private final byte[] readBytes = new byte[BUFFER_SIZE];
55 // By default the write buffer will be the exact same as the read buffer
56 // since initially were not forced to write changes to the disk.
57 private byte[] writeBytes = readBytes;
59 private int readBufferIndex = 0;
60 private int writeBufferIndex = 0;
63 * <code>true</code> if something has been written since last call to
64 * {@link #resetRead()}. By default it is <code>true</code> since
65 * {@link #resetRead()} must on its first invocation write the {@link #TERM}
66 * constant into the stream.
68 private boolean wroteSinceLastRead = true;
70 private final ByteBuffer readBuffer = ByteBuffer.wrap(readBytes);
71 private ByteBuffer writeBuffer = ByteBuffer.wrap(writeBytes);
73 private File tempFile;
74 private RandomAccessFile file;
75 private FileChannel channel;
76 private long currentPosition = 0L;
77 private long readPosition = 0L;
78 private long writePosition = 0L;
80 static abstract class NoRemoveIterator<T> implements Iterator<T> {
82 public void remove() {
83 throw new UnsupportedOperationException();
88 * Base class for all immutable collections returned by
89 * {@link ClientChangesImpl}.
93 static class CollectionBase<T> implements Collection<T> {
97 throw new UnsupportedOperationException();
101 public boolean isEmpty() {
102 throw new UnsupportedOperationException();
106 public boolean contains(Object o) {
107 throw new UnsupportedOperationException();
111 public Iterator<T> iterator() {
112 throw new UnsupportedOperationException();
116 public Object[] toArray() {
117 throw new UnsupportedOperationException();
121 public <TT> TT[] toArray(TT[] a) {
122 throw new UnsupportedOperationException();
126 public boolean add(T e) {
127 throw new UnsupportedOperationException();
131 public boolean remove(Object o) {
132 throw new UnsupportedOperationException();
136 public boolean containsAll(Collection<?> c) {
137 throw new UnsupportedOperationException();
141 public boolean addAll(Collection<? extends T> c) {
142 throw new UnsupportedOperationException();
146 public boolean removeAll(Collection<?> c) {
147 throw new UnsupportedOperationException();
151 public boolean retainAll(Collection<?> c) {
152 throw new UnsupportedOperationException();
156 public void clear() {
157 throw new UnsupportedOperationException();
162 ClientChangesImpl(SessionImplSocket session) {
163 this.session = session;
167 public String toString() {
168 final int LIMIT = 10000;
170 for (StatementChange s : changedStatements()) {
171 String op = s.isClaim() ? "add" : "rem";
172 to += "\n" + op + " " + s.getSubject() + " " + s.getPredicate() + " " + s.getObject();
173 if (to.length() > LIMIT) {
178 for (Resource r : changedValues()) {
180 if (to.length() > LIMIT) {
188 private Resource getResource(int id) {
189 return session.getResource(id);
193 public synchronized Collection<StatementChange> changedStatements() {
196 return Collections.emptyList();
198 return new CollectionBase<StatementChange>() {
200 StatementChange seek() {
203 byte method = readByte();
209 Resource s = getResource(readInt());
210 Resource p = getResource(readInt());
211 Resource o = getResource(readInt());
212 return new StatementChangeImpl(s, p, o, true);
215 Resource s = getResource(readInt());
216 Resource p = getResource(readInt());
217 Resource o = getResource(readInt());
218 return new StatementChangeImpl(s, p, o, false);
222 // Skipping value opererations.
223 getResource(readInt());
228 } catch(Exception e) {
229 throw new RuntimeDatabaseException(e);
233 private StatementChange next = seek();
236 public Iterator<StatementChange> iterator() {
237 return new NoRemoveIterator<StatementChange>() {
239 public boolean hasNext() {
243 public StatementChange next() {
244 StatementChange result = next;
256 public Collection<Resource> changedValues() {
259 return Collections.emptyList();
261 return new CollectionBase<Resource>() {
266 byte method = readByte();
272 getResource(readInt());
273 getResource(readInt());
274 getResource(readInt());
278 getResource(readInt());
279 getResource(readInt());
280 getResource(readInt());
285 return getResource(readInt());
289 } catch(Exception e) {
294 private Resource next = seek();
297 public Iterator<Resource> iterator() {
298 return new NoRemoveIterator<Resource>() {
300 public boolean hasNext() {
304 public Resource next() {
305 Resource result = next;
317 public Collection<Resource> changedResources() {
320 return Collections.emptyList();
322 return new CollectionBase<Resource>() {
328 byte method = readByte();
334 getResource(readInt());
335 getResource(readInt());
336 getResource(readInt());
340 getResource(readInt());
341 getResource(readInt());
342 getResource(readInt());
346 getResource(readInt());
350 return getResource(readInt());
354 } catch(Exception e) {
360 private Resource next = seek();
363 public Iterator<Resource> iterator() {
364 return new NoRemoveIterator<Resource>() {
366 public boolean hasNext() {
370 public Resource next() {
371 Resource result = next;
383 public boolean isEmpty() {
386 return writeBufferIndex == 0 && !overflow;
389 private boolean notEmpty = false;
390 void setNotEmpty(boolean notEmpty) {
391 this.notEmpty = notEmpty;
394 void claim(Resource s, Resource p, Resource o) {
396 writeInt(session.getId((ResourceImpl) s));
397 writeInt(session.getId((ResourceImpl) p));
398 writeInt(session.getId((ResourceImpl) o));
399 if(DEBUG) System.err.println("claim1 $" + s.getResourceId() + " $" + p.getResourceId() + " $" + o.getResourceId());
402 private int sCache1 = -1;
403 private int sCache2 = -1;
404 private int sCache3 = -1;
406 final void invalidate(int s) {
410 } else if (s == sCache2) {
414 } else if (s == sCache3) {
425 writeByte(INVALIDATE);
428 if(DEBUG) System.err.println("invalidate2 " + s);
432 void claim(int s, int p, int o) {
437 if(DEBUG) System.err.println("claim2 " + s + " " + p + " " + o);
440 void deny(Resource s, Resource p, Resource o) {
442 writeInt(session.getId((ResourceImpl)s));
443 writeInt(session.getId((ResourceImpl)p));
444 writeInt(session.getId((ResourceImpl)o));
445 if(DEBUG) System.err.println("deny2 " + session.getId((ResourceImpl)s) + " " + session.getId((ResourceImpl)p) + " " + session.getId((ResourceImpl)o));
448 void deny(int s, int p, int o) {
453 if(DEBUG) System.err.println("deny2 " + s + " " + p + " " + o);
456 void claimValue(Resource r) {
458 writeInt(session.getId((ResourceImpl)r));
459 if(DEBUG) System.err.println("claimOrDenyValue1 $" + r.getResourceId());
462 void claimValue(int r) {
465 if(DEBUG) System.err.println("deny2 " + r);
470 private byte readByte() {
471 byte result = readBytes[readBufferIndex++];
472 if(readBufferIndex == BUFFER_SIZE) fillReadBuffer();
476 private int readInt() {
477 if(readBufferIndex < (BUFFER_SIZE-4)) {
479 ((readBytes[readBufferIndex++] & 0xff) |
480 ((readBytes[readBufferIndex++] & 0xff) << 8) |
481 ((readBytes[readBufferIndex++] & 0xff) << 16) |
482 ((readBytes[readBufferIndex++] & 0xff) << 24));
485 int has = BUFFER_SIZE-readBufferIndex;
487 if(has == 0) fillReadBuffer();
488 result = (int)(readBytes[readBufferIndex++] & 0xff);
489 if(has == 1) fillReadBuffer();
490 result |= (int)((readBytes[readBufferIndex++] & 0xff) << 8);
491 if(has == 2) fillReadBuffer();
492 result |= (int)((readBytes[readBufferIndex++] & 0xff) << 16);
493 if(has == 3) fillReadBuffer();
494 result |= (int)((readBytes[readBufferIndex++] & 0xff) << 24);
495 if(has == 4) fillReadBuffer();
500 private void writeByte(int b) {
501 writeBytes[writeBufferIndex++] = (byte)b;
502 if(writeBufferIndex == BUFFER_SIZE) flushWriteBuffer(BUFFER_SIZE);
503 wroteSinceLastRead = true;
506 private void writeInt(int i) {
507 if(writeBufferIndex < (BUFFER_SIZE-4)) {
508 writeBytes[writeBufferIndex++] = (byte)(i&0xff);
509 writeBytes[writeBufferIndex++] = (byte)((i>>>8)&0xff);
510 writeBytes[writeBufferIndex++] = (byte)((i>>>16)&0xff);
511 writeBytes[writeBufferIndex++] = (byte)((i>>>24)&0xff);
513 int has = BUFFER_SIZE-writeBufferIndex;
514 if(has == 0) flushWriteBuffer(BUFFER_SIZE);
515 writeBytes[writeBufferIndex++] = (byte)(i&0xff);
516 if(has == 1) flushWriteBuffer(BUFFER_SIZE);
517 writeBytes[writeBufferIndex++] = (byte)((i>>>8)&0xff);
518 if(has == 2) flushWriteBuffer(BUFFER_SIZE);
519 writeBytes[writeBufferIndex++] = (byte)((i>>>16)&0xff);
520 if(has == 3) flushWriteBuffer(BUFFER_SIZE);
521 writeBytes[writeBufferIndex++] = (byte)((i>>>24)&0xff);
522 if(has == 4) flushWriteBuffer(BUFFER_SIZE);
524 wroteSinceLastRead = true;
527 private int tryFlushBuffer(int size) throws IOException {
529 System.err.println("tryFlushBuffer(" + size + "): tempFile=" + tempFile);
531 if (tempFile == null) {
532 File base = InternalClientConfig.DB_CLIENT_TEMP_DIR;
534 tempFile = File.createTempFile("cset-", ".cs", base);
535 file = new RandomAccessFile(tempFile, "rwd");
536 channel = file.getChannel();
538 System.err.println("Allocated temporary file for changeset: " + tempFile);
542 writeBuffer.position(0);
543 writeBuffer.limit(size);
544 for (; wrote < size;) {
545 wrote += channel.write(writeBuffer);
550 private void flushWriteBuffer(int size) {
552 System.err.println("flushBuffer(" + size + ") overflow=" + overflow);
557 // Change set is about to become file-backed.
558 // Initialize separate buffer for writing.
559 writeBytes = Arrays.copyOf(readBytes, readBytes.length);
560 writeBuffer = ByteBuffer.wrap(writeBytes);
564 writeBufferIndex = 0;
567 setFilePosition(writePosition);
568 wrote = tryFlushBuffer(size);
569 } catch (IOException e) {
571 throw new RuntimeDatabaseException(e);
574 currentPosition = channel.position();
575 writePosition = currentPosition;
576 } catch (IOException e) {
580 System.err.println("flushWriteBuffer wrote " + wrote + " bytes");
584 private boolean resetRead() {
586 System.err.println("resetRead: overflow=" + overflow + ", file=" + tempFile);
588 // Flush temporary file first if necessary.
589 if (wroteSinceLastRead) {
591 System.err.println("\tTerminating write of changeset " + tempFile);
593 wroteSinceLastRead = false;
597 System.err.println("\tReading changeset from " + tempFile.getAbsolutePath());
598 flushWriteBuffer(writeBufferIndex);
600 setFilePosition(readPosition);
606 } catch (IOException e) {
612 private void fillReadBuffer() {
616 setFilePosition(readPosition);
617 readBuffer.position(0);
618 readBuffer.limit(BUFFER_SIZE);
619 for (;read < BUFFER_SIZE;) {
620 int got = channel.read(readBuffer);
625 } catch (IOException e) {
629 currentPosition = channel.position();
630 readPosition = currentPosition;
631 } catch (IOException e) {
635 System.err.println("fillReadBuffer read " + read + " bytes");
641 * @return <code>true</code> if file position was modified
642 * @throws IOException
644 private boolean setFilePosition(long newPosition) throws IOException {
645 if (channel == null) {
648 if (newPosition != currentPosition) {
650 System.err.println("setting new file position(" + newPosition + "), old position=" + currentPosition);
651 channel.position(newPosition);
652 currentPosition = newPosition;
659 protected void finalize() throws Throwable {
664 public void close() {
669 System.err.println("Closing temporary changeset input stream " + file);
672 } catch (IOException e) {
677 if (tempFile != null) {
679 System.err.println("Deleting temporary changeset file " + tempFile);
685 public static void main(String[] args) {
686 ClientChangesImpl ccs = new ClientChangesImpl(null);
687 System.out.println("isEmpty=" + ccs.isEmpty());
688 System.out.println("clientChanges=" + ccs.toString());
689 System.out.println("isEmpty=" + ccs.isEmpty());