1 /*******************************************************************************
\r
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
\r
3 * in Industry THTH ry.
\r
4 * All rights reserved. This program and the accompanying materials
\r
5 * are made available under the terms of the Eclipse Public License v1.0
\r
6 * which accompanies this distribution, and is available at
\r
7 * http://www.eclipse.org/legal/epl-v10.html
\r
10 * VTT Technical Research Centre of Finland - initial API and implementation
\r
11 *******************************************************************************/
\r
12 package fi.vtt.simantics.procore.internal;
\r
14 import java.io.File;
\r
15 import java.io.IOException;
\r
16 import java.io.RandomAccessFile;
\r
17 import java.nio.ByteBuffer;
\r
18 import java.nio.channels.FileChannel;
\r
19 import java.util.Arrays;
\r
20 import java.util.Collection;
\r
21 import java.util.Collections;
\r
22 import java.util.Iterator;
\r
24 import org.simantics.db.ChangeSet;
\r
25 import org.simantics.db.Resource;
\r
26 import org.simantics.db.common.internal.config.InternalClientConfig;
\r
27 import org.simantics.db.exception.RuntimeDatabaseException;
\r
28 import org.simantics.db.impl.ResourceImpl;
\r
31 * @author Antti Villberg
\r
32 * @author Tuukka Lehtonen
\r
34 public final class ClientChangesImpl implements ChangeSet {
\r
36 private static final boolean DEBUG = false;
\r
38 private static final int BUFFER_SIZE = 1 << 17;
\r
40 private static final byte CLAIM = 1;
\r
41 private static final byte DENY = 2;
\r
42 private static final byte VALUE = 3;
\r
43 private static final byte TERM = 4;
\r
44 private static final byte INVALIDATE = 5;
\r
46 private SessionImplSocket session;
\r
49 * True if enough data has been written to overflow the normal buffer which
\r
50 * will result in data being written to disk.
\r
52 private boolean overflow = false;
\r
54 private final byte[] readBytes = new byte[BUFFER_SIZE];
\r
55 // By default the write buffer will be the exact same as the read buffer
\r
56 // since initially were not forced to write changes to the disk.
\r
57 private byte[] writeBytes = readBytes;
\r
59 private int readBufferIndex = 0;
\r
60 private int writeBufferIndex = 0;
\r
63 * <code>true</code> if something has been written since last call to
\r
64 * {@link #resetRead()}. By default it is <code>true</code> since
\r
65 * {@link #resetRead()} must on its first invocation write the {@link #TERM}
\r
66 * constant into the stream.
\r
68 private boolean wroteSinceLastRead = true;
\r
70 private final ByteBuffer readBuffer = ByteBuffer.wrap(readBytes);
\r
71 private ByteBuffer writeBuffer = ByteBuffer.wrap(writeBytes);
\r
73 private File tempFile;
\r
74 private RandomAccessFile file;
\r
75 private FileChannel channel;
\r
76 private long currentPosition = 0L;
\r
77 private long readPosition = 0L;
\r
78 private long writePosition = 0L;
\r
80 static abstract class NoRemoveIterator<T> implements Iterator<T> {
\r
82 public void remove() {
\r
83 throw new UnsupportedOperationException();
\r
88 * Base class for all immutable collections returned by
\r
89 * {@link ClientChangesImpl}.
\r
93 static class CollectionBase<T> implements Collection<T> {
\r
97 throw new UnsupportedOperationException();
\r
101 public boolean isEmpty() {
\r
102 throw new UnsupportedOperationException();
\r
106 public boolean contains(Object o) {
\r
107 throw new UnsupportedOperationException();
\r
111 public Iterator<T> iterator() {
\r
112 throw new UnsupportedOperationException();
\r
116 public Object[] toArray() {
\r
117 throw new UnsupportedOperationException();
\r
121 public <TT> TT[] toArray(TT[] a) {
\r
122 throw new UnsupportedOperationException();
\r
126 public boolean add(T e) {
\r
127 throw new UnsupportedOperationException();
\r
131 public boolean remove(Object o) {
\r
132 throw new UnsupportedOperationException();
\r
136 public boolean containsAll(Collection<?> c) {
\r
137 throw new UnsupportedOperationException();
\r
141 public boolean addAll(Collection<? extends T> c) {
\r
142 throw new UnsupportedOperationException();
\r
146 public boolean removeAll(Collection<?> c) {
\r
147 throw new UnsupportedOperationException();
\r
151 public boolean retainAll(Collection<?> c) {
\r
152 throw new UnsupportedOperationException();
\r
156 public void clear() {
\r
157 throw new UnsupportedOperationException();
\r
162 ClientChangesImpl(SessionImplSocket session) {
\r
163 this.session = session;
\r
167 public String toString() {
\r
168 final int LIMIT = 10000;
\r
170 for (StatementChange s : changedStatements()) {
\r
171 String op = s.isClaim() ? "add" : "rem";
\r
172 to += "\n" + op + " " + s.getSubject() + " " + s.getPredicate() + " " + s.getObject();
\r
173 if (to.length() > LIMIT) {
\r
174 to += "... more.\n";
\r
178 for (Resource r : changedValues()) {
\r
179 to += "\nmod " + r;
\r
180 if (to.length() > LIMIT) {
\r
188 private Resource getResource(int id) {
\r
189 return session.getResource(id);
\r
193 public synchronized Collection<StatementChange> changedStatements() {
\r
196 return Collections.emptyList();
\r
198 return new CollectionBase<StatementChange>() {
\r
200 StatementChange seek() {
\r
203 byte method = readByte();
\r
209 Resource s = getResource(readInt());
\r
210 Resource p = getResource(readInt());
\r
211 Resource o = getResource(readInt());
\r
212 return new StatementChangeImpl(s, p, o, true);
\r
215 Resource s = getResource(readInt());
\r
216 Resource p = getResource(readInt());
\r
217 Resource o = getResource(readInt());
\r
218 return new StatementChangeImpl(s, p, o, false);
\r
222 // Skipping value opererations.
\r
223 getResource(readInt());
\r
228 } catch(Exception e) {
\r
229 throw new RuntimeDatabaseException(e);
\r
233 private StatementChange next = seek();
\r
236 public Iterator<StatementChange> iterator() {
\r
237 return new NoRemoveIterator<StatementChange>() {
\r
239 public boolean hasNext() {
\r
240 return next != null;
\r
243 public StatementChange next() {
\r
244 StatementChange result = next;
\r
256 public Collection<Resource> changedValues() {
\r
259 return Collections.emptyList();
\r
261 return new CollectionBase<Resource>() {
\r
266 byte method = readByte();
\r
272 getResource(readInt());
\r
273 getResource(readInt());
\r
274 getResource(readInt());
\r
278 getResource(readInt());
\r
279 getResource(readInt());
\r
280 getResource(readInt());
\r
285 return getResource(readInt());
\r
289 } catch(Exception e) {
\r
294 private Resource next = seek();
\r
297 public Iterator<Resource> iterator() {
\r
298 return new NoRemoveIterator<Resource>() {
\r
300 public boolean hasNext() {
\r
301 return next != null;
\r
304 public Resource next() {
\r
305 Resource result = next;
\r
317 public Collection<Resource> changedResources() {
\r
320 return Collections.emptyList();
\r
322 return new CollectionBase<Resource>() {
\r
328 byte method = readByte();
\r
334 getResource(readInt());
\r
335 getResource(readInt());
\r
336 getResource(readInt());
\r
340 getResource(readInt());
\r
341 getResource(readInt());
\r
342 getResource(readInt());
\r
346 getResource(readInt());
\r
350 return getResource(readInt());
\r
354 } catch(Exception e) {
\r
360 private Resource next = seek();
\r
363 public Iterator<Resource> iterator() {
\r
364 return new NoRemoveIterator<Resource>() {
\r
366 public boolean hasNext() {
\r
367 return next != null;
\r
370 public Resource next() {
\r
371 Resource result = next;
\r
383 public boolean isEmpty() {
\r
386 return writeBufferIndex == 0 && !overflow;
\r
389 private boolean notEmpty = false;
\r
390 void setNotEmpty(boolean notEmpty) {
\r
391 this.notEmpty = notEmpty;
\r
394 void claim(Resource s, Resource p, Resource o) {
\r
396 writeInt(session.getId((ResourceImpl) s));
\r
397 writeInt(session.getId((ResourceImpl) p));
\r
398 writeInt(session.getId((ResourceImpl) o));
\r
399 if(DEBUG) System.err.println("claim1 $" + s.getResourceId() + " $" + p.getResourceId() + " $" + o.getResourceId());
\r
402 private int sCache1 = -1;
\r
403 private int sCache2 = -1;
\r
404 private int sCache3 = -1;
\r
406 final void invalidate(int s) {
\r
410 } else if (s == sCache2) {
\r
414 } else if (s == sCache3) {
\r
425 writeByte(INVALIDATE);
\r
428 if(DEBUG) System.err.println("invalidate2 " + s);
\r
432 void claim(int s, int p, int o) {
\r
437 if(DEBUG) System.err.println("claim2 " + s + " " + p + " " + o);
\r
440 void deny(Resource s, Resource p, Resource o) {
\r
442 writeInt(session.getId((ResourceImpl)s));
\r
443 writeInt(session.getId((ResourceImpl)p));
\r
444 writeInt(session.getId((ResourceImpl)o));
\r
445 if(DEBUG) System.err.println("deny2 " + session.getId((ResourceImpl)s) + " " + session.getId((ResourceImpl)p) + " " + session.getId((ResourceImpl)o));
\r
448 void deny(int s, int p, int o) {
\r
453 if(DEBUG) System.err.println("deny2 " + s + " " + p + " " + o);
\r
456 void claimValue(Resource r) {
\r
458 writeInt(session.getId((ResourceImpl)r));
\r
459 if(DEBUG) System.err.println("claimOrDenyValue1 $" + r.getResourceId());
\r
462 void claimValue(int r) {
\r
465 if(DEBUG) System.err.println("deny2 " + r);
\r
470 private byte readByte() {
\r
471 byte result = readBytes[readBufferIndex++];
\r
472 if(readBufferIndex == BUFFER_SIZE) fillReadBuffer();
\r
476 private int readInt() {
\r
477 if(readBufferIndex < (BUFFER_SIZE-4)) {
\r
478 int result = (int)
\r
479 ((readBytes[readBufferIndex++] & 0xff) |
\r
480 ((readBytes[readBufferIndex++] & 0xff) << 8) |
\r
481 ((readBytes[readBufferIndex++] & 0xff) << 16) |
\r
482 ((readBytes[readBufferIndex++] & 0xff) << 24));
\r
485 int has = BUFFER_SIZE-readBufferIndex;
\r
487 if(has == 0) fillReadBuffer();
\r
488 result = (int)(readBytes[readBufferIndex++] & 0xff);
\r
489 if(has == 1) fillReadBuffer();
\r
490 result |= (int)((readBytes[readBufferIndex++] & 0xff) << 8);
\r
491 if(has == 2) fillReadBuffer();
\r
492 result |= (int)((readBytes[readBufferIndex++] & 0xff) << 16);
\r
493 if(has == 3) fillReadBuffer();
\r
494 result |= (int)((readBytes[readBufferIndex++] & 0xff) << 24);
\r
495 if(has == 4) fillReadBuffer();
\r
500 private void writeByte(int b) {
\r
501 writeBytes[writeBufferIndex++] = (byte)b;
\r
502 if(writeBufferIndex == BUFFER_SIZE) flushWriteBuffer(BUFFER_SIZE);
\r
503 wroteSinceLastRead = true;
\r
506 private void writeInt(int i) {
\r
507 if(writeBufferIndex < (BUFFER_SIZE-4)) {
\r
508 writeBytes[writeBufferIndex++] = (byte)(i&0xff);
\r
509 writeBytes[writeBufferIndex++] = (byte)((i>>>8)&0xff);
\r
510 writeBytes[writeBufferIndex++] = (byte)((i>>>16)&0xff);
\r
511 writeBytes[writeBufferIndex++] = (byte)((i>>>24)&0xff);
\r
513 int has = BUFFER_SIZE-writeBufferIndex;
\r
514 if(has == 0) flushWriteBuffer(BUFFER_SIZE);
\r
515 writeBytes[writeBufferIndex++] = (byte)(i&0xff);
\r
516 if(has == 1) flushWriteBuffer(BUFFER_SIZE);
\r
517 writeBytes[writeBufferIndex++] = (byte)((i>>>8)&0xff);
\r
518 if(has == 2) flushWriteBuffer(BUFFER_SIZE);
\r
519 writeBytes[writeBufferIndex++] = (byte)((i>>>16)&0xff);
\r
520 if(has == 3) flushWriteBuffer(BUFFER_SIZE);
\r
521 writeBytes[writeBufferIndex++] = (byte)((i>>>24)&0xff);
\r
522 if(has == 4) flushWriteBuffer(BUFFER_SIZE);
\r
524 wroteSinceLastRead = true;
\r
527 private int tryFlushBuffer(int size) throws IOException {
\r
529 System.err.println("tryFlushBuffer(" + size + "): tempFile=" + tempFile);
\r
531 if (tempFile == null) {
\r
532 File base = InternalClientConfig.DB_CLIENT_TEMP_DIR;
\r
534 tempFile = File.createTempFile("cset-", ".cs", base);
\r
535 file = new RandomAccessFile(tempFile, "rwd");
\r
536 channel = file.getChannel();
\r
538 System.err.println("Allocated temporary file for changeset: " + tempFile);
\r
542 writeBuffer.position(0);
\r
543 writeBuffer.limit(size);
\r
544 for (; wrote < size;) {
\r
545 wrote += channel.write(writeBuffer);
\r
550 private void flushWriteBuffer(int size) {
\r
552 System.err.println("flushBuffer(" + size + ") overflow=" + overflow);
\r
557 // Change set is about to become file-backed.
\r
558 // Initialize separate buffer for writing.
\r
559 writeBytes = Arrays.copyOf(readBytes, readBytes.length);
\r
560 writeBuffer = ByteBuffer.wrap(writeBytes);
\r
564 writeBufferIndex = 0;
\r
567 setFilePosition(writePosition);
\r
568 wrote = tryFlushBuffer(size);
\r
569 } catch (IOException e) {
\r
570 e.printStackTrace();
\r
571 throw new RuntimeDatabaseException(e);
\r
574 currentPosition = channel.position();
\r
575 writePosition = currentPosition;
\r
576 } catch (IOException e) {
\r
577 e.printStackTrace();
\r
580 System.err.println("flushWriteBuffer wrote " + wrote + " bytes");
\r
584 private boolean resetRead() {
\r
586 System.err.println("resetRead: overflow=" + overflow + ", file=" + tempFile);
\r
588 // Flush temporary file first if necessary.
\r
589 if (wroteSinceLastRead) {
\r
591 System.err.println("\tTerminating write of changeset " + tempFile);
\r
593 wroteSinceLastRead = false;
\r
597 System.err.println("\tReading changeset from " + tempFile.getAbsolutePath());
\r
598 flushWriteBuffer(writeBufferIndex);
\r
600 setFilePosition(readPosition);
\r
603 readBufferIndex = 0;
\r
606 } catch (IOException e) {
\r
607 e.printStackTrace();
\r
612 private void fillReadBuffer() {
\r
613 readBufferIndex = 0;
\r
616 setFilePosition(readPosition);
\r
617 readBuffer.position(0);
\r
618 readBuffer.limit(BUFFER_SIZE);
\r
619 for (;read < BUFFER_SIZE;) {
\r
620 int got = channel.read(readBuffer);
\r
625 } catch (IOException e) {
\r
626 e.printStackTrace();
\r
629 currentPosition = channel.position();
\r
630 readPosition = currentPosition;
\r
631 } catch (IOException e) {
\r
632 e.printStackTrace();
\r
635 System.err.println("fillReadBuffer read " + read + " bytes");
\r
640 * @param newPosition
\r
641 * @return <code>true</code> if file position was modified
\r
642 * @throws IOException
\r
644 private boolean setFilePosition(long newPosition) throws IOException {
\r
645 if (channel == null) {
\r
648 if (newPosition != currentPosition) {
\r
650 System.err.println("setting new file position(" + newPosition + "), old position=" + currentPosition);
\r
651 channel.position(newPosition);
\r
652 currentPosition = newPosition;
\r
659 protected void finalize() throws Throwable {
\r
664 public void close() {
\r
667 if (file != null) {
\r
669 System.err.println("Closing temporary changeset input stream " + file);
\r
672 } catch (IOException e) {
\r
677 if (tempFile != null) {
\r
679 System.err.println("Deleting temporary changeset file " + tempFile);
\r
685 public static void main(String[] args) {
\r
686 ClientChangesImpl ccs = new ClientChangesImpl(null);
\r
687 System.out.println("isEmpty=" + ccs.isEmpty());
\r
688 System.out.println("clientChanges=" + ccs.toString());
\r
689 System.out.println("isEmpty=" + ccs.isEmpty());
\r