1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package org.simantics.db.impl.graph;
15 import java.io.Closeable;
17 import java.io.FileInputStream;
18 import java.io.FileOutputStream;
19 import java.io.IOException;
20 import java.io.InputStream;
21 import java.io.OutputStream;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.FileChannel;
24 import java.nio.file.Files;
25 import java.util.ArrayList;
26 import java.util.HashSet;
28 import java.util.TreeMap;
29 import java.util.UUID;
31 import org.eclipse.core.runtime.Platform;
32 import org.simantics.databoard.Bindings;
33 import org.simantics.databoard.accessor.Accessor;
34 import org.simantics.databoard.binding.Binding;
35 import org.simantics.databoard.binding.error.BindingConstructionException;
36 import org.simantics.databoard.serialization.Serializer;
37 import org.simantics.databoard.type.Datatype;
38 import org.simantics.databoard.util.binary.RandomAccessBinary;
39 import org.simantics.db.Metadata;
40 import org.simantics.db.ReadGraph;
41 import org.simantics.db.Resource;
42 import org.simantics.db.Session;
43 import org.simantics.db.Statement;
44 import org.simantics.db.VirtualGraph;
45 import org.simantics.db.WriteGraph;
46 import org.simantics.db.WriteOnlyGraph;
47 import org.simantics.db.common.MetadataUtils;
48 import org.simantics.db.common.request.WriteOnlyRequest;
49 import org.simantics.db.common.utils.Logger;
50 import org.simantics.db.exception.BindingException;
51 import org.simantics.db.exception.ClusterSetExistException;
52 import org.simantics.db.exception.DatabaseException;
53 import org.simantics.db.exception.ManyObjectsForFunctionalRelationException;
54 import org.simantics.db.exception.NoSingleResultException;
55 import org.simantics.db.exception.ResourceNotFoundException;
56 import org.simantics.db.exception.ServiceException;
57 import org.simantics.db.impl.DebugPolicy;
58 import org.simantics.db.impl.ResourceImpl;
59 import org.simantics.db.request.DelayedWrite;
60 import org.simantics.db.request.WriteOnly;
61 import org.simantics.db.request.WriteResult;
62 import org.simantics.db.request.WriteTraits;
63 import org.simantics.db.service.ByteReader;
64 import org.simantics.db.service.ClusteringSupport;
65 import org.simantics.db.service.TransferableGraphSupport;
66 import org.simantics.db.service.XSupport;
67 import org.simantics.layer0.Layer0;
68 import org.simantics.utils.datastructures.MapList;
70 import gnu.trove.map.hash.TIntIntHashMap;
71 import gnu.trove.map.hash.TObjectIntHashMap;
74 * Write graph implementation that does not modify the database
75 * immediately but with an explicit commit method. All read operations
76 * return results based on the old graph.
78 public class DelayedWriteGraph extends ReadGraphImpl implements WriteGraph, ByteReader, Closeable {
80 private static final boolean DEBUG = false;
81 private static final int BUFFER_SIZE = 512*1024;
83 private static final PassthroughSerializerBinding PASSTHROUGH = new PassthroughSerializerBinding();
85 private final int TERM = 0;
86 private final int CLAIM = 1;
87 private final int CLAIM_NOINVERSE = 2;
88 private final int CLAIM_VALUE_B = 4;
89 private final int DENY = 5;
90 private final int DENY_VALUE = 6;
91 private final int COMMIT_AND_CONTINUE = 7;
93 private static class State {
95 public FileOutputStream out;
96 public FileInputStream in;
97 public ArrayList<Resource> idToResource = new ArrayList<Resource>();
98 public TIntIntHashMap externalToId = new TIntIntHashMap();
99 public ArrayList<Binding> idToBinding = new ArrayList<Binding>();
100 public TObjectIntHashMap<Binding> bindingToId = new TObjectIntHashMap<Binding>();
101 public Set<Resource> clusterSets = new HashSet<Resource>();
102 public Set<Resource> clusterSetsForExistingResources = new HashSet<Resource>();
103 public int clusterCount = 0;
104 public long defaultCluster;
105 public Resource defaultClusterSet;
106 public int statementCount = 0;
107 public int valueCount = 0;
108 public int fileCount = 0;
111 private State writeState;
112 private TreeMap<String,byte[]> metadata = new TreeMap<String,byte[]>();
114 private FileChannel channel;
115 private byte[] bytes = new byte[BUFFER_SIZE];
116 private ByteBuffer bb = ByteBuffer.wrap(bytes);
117 private int byteIndex = 0;
120 private Session session;
122 public static Resource convertDelayedResource(Resource r) {
123 if (r instanceof InternalResource)
124 return ((InternalResource) r).resource;
128 private static class InternalResource implements Resource {
132 Resource resource = null;
133 Resource clusterSet = null;
135 public InternalResource(int id, long clusterId) {
137 this.clusterId = clusterId;
140 public InternalResource(int id, Resource clusterSet) {
142 this.clusterSet = clusterSet;
146 public long getResourceId() {
147 throw new UnsupportedOperationException();
151 public Resource get() {
156 public boolean isPersistent() {
161 public int compareTo(Resource o) {
162 if(o instanceof InternalResource)
163 return Integer.compare(id, ((InternalResource)o).id);
168 public int hashCode() {
169 final int prime = 31;
171 result = prime * result + id;
176 public int getThreadHash() {
181 public boolean equals(Object obj) {
186 if (!(obj instanceof InternalResource))
188 InternalResource other = (InternalResource) obj;
195 public boolean equalsResource(Resource other) {
196 return equals(other);
200 public String toString() {
201 StringBuilder sb = new StringBuilder(32);
202 if(DebugPolicy.VERBOSE) {
203 sb.append("[delayed id=");
211 return sb.toString();
215 private int getId(Resource resource) {
216 if(resource instanceof InternalResource)
217 return ((InternalResource)resource).id;
219 ResourceImpl r = (ResourceImpl)resource;
220 int id = writeState.externalToId.get(r.id);
224 id = writeState.idToResource.size();
225 writeState.idToResource.add(resource);
226 writeState.externalToId.put(r.id, id);
232 private Resource getResource(int id) {
233 return writeState.idToResource.get(id);
236 private int getBindingId(Binding binding) {
237 if(writeState.bindingToId.contains(binding))
238 return writeState.bindingToId.get(binding);
240 int id = writeState.idToBinding.size();
241 writeState.idToBinding.add(binding);
242 writeState.bindingToId.put(binding, id);
247 public DelayedWriteGraph(ReadGraph g) throws IOException {
248 super((ReadGraphImpl)g);
249 writeState = new State();
250 session = g.getSession();
251 b = Layer0.getInstance(g);
252 writeState.defaultCluster = newCluster();
255 public DelayedWriteGraph(ReadGraph g, State state) {
256 super((ReadGraphImpl)g);
257 session = g.getSession();
258 b = Layer0.getInstance(g);
259 this.writeState = state;
262 public DelayedWriteGraph newSync() {
263 return new DelayedWriteGraph(this, writeState);
267 public void claim(Resource subject, Resource predicate, Resource object)
268 throws ServiceException {
269 assert(subject != null);
270 assert(predicate != null);
271 assert(object != null);
273 Resource inverse = getPossibleInverse(predicate);
274 claim(subject, predicate, inverse, object);
278 public void addLiteral(Resource resource, Resource predicate, Resource inverse, Resource type, Object value,
279 Binding binding) throws BindingException,
280 ManyObjectsForFunctionalRelationException, ServiceException {
281 Resource valueResource = newResource();
282 claimValue(valueResource, value, binding);
283 claim(valueResource, b.InstanceOf, null, type);
284 claim(resource, predicate, inverse, valueResource);
288 public void addLiteral(Resource resource, Resource predicate,
289 Resource inverse, Object value, Binding binding)
290 throws BindingException, ManyObjectsForFunctionalRelationException,
293 Resource type = getType(value);
295 Resource literal = newResource();
297 Resource dataType = newResource();
298 claim(dataType, b.InstanceOf, null, b.DataType);
299 claimValue(dataType, binding.type(), DATA_TYPE_BINDING_INTERNAL);
300 claim(literal, b.HasDataType, null, dataType);
301 claim(literal, b.InstanceOf, null, type);
302 claimValue(literal, value, binding);
303 claim(resource, predicate, inverse, literal);
305 addLiteral(resource, predicate, inverse, type, value, binding);
311 public <T extends Accessor> T newLiteral(Resource resource, Resource predicate, Datatype datatype, Object initialValue)
312 throws DatabaseException {
314 throw new UnsupportedOperationException();
319 public RandomAccessBinary createRandomAccessBinary (Resource resource, Resource predicate, Datatype datatype, Object initialValue) throws DatabaseException {
320 throw new UnsupportedOperationException();
324 public RandomAccessBinary createRandomAccessBinary(Resource resource, Datatype datatype, Object initialValue) throws DatabaseException {
325 throw new UnsupportedOperationException();
329 public void claimLiteral(Resource resource, Resource predicate, Object value) throws ManyObjectsForFunctionalRelationException, ServiceException {
332 Binding b = Bindings.getBinding(value.getClass());
333 claimLiteral(resource, predicate, value, b);
334 } catch (BindingConstructionException e) {
335 throw new IllegalArgumentException(e);
336 } catch (BindingException e) {
337 throw new IllegalArgumentException(e);
343 public void claimLiteral(Resource resource, Resource predicate, Object value, Binding binding) throws BindingException,
344 ManyObjectsForFunctionalRelationException, ServiceException {
346 Statement valueStatement = null;
347 if(!(resource instanceof InternalResource)) valueStatement = getPossibleStatement(resource, predicate);
349 if(valueStatement != null && resource.equals(valueStatement.getSubject())) {
351 claimValue(valueStatement.getObject(), value, binding);
355 Resource type = getType(value);
356 Resource literal = newResource();
359 Resource dataType = newResource();
360 claim(dataType, b.InstanceOf, null, b.DataType);
361 claimValue(dataType, binding.type(), DATA_TYPE_BINDING_INTERNAL);
362 claim(literal, b.HasDataType, dataType);
364 claim(literal, b.InstanceOf, null, type);
365 claimValue(literal, value, binding);
366 claim(resource, predicate, literal);
373 public void claimLiteral(Resource resource, Resource predicate,
374 Resource inverse, Resource type, Object value)
375 throws BindingException, ManyObjectsForFunctionalRelationException,
379 Binding b = Bindings.getBinding(value.getClass());
380 claimLiteral(resource, predicate, inverse, type, value, b);
381 } catch (BindingConstructionException e) {
382 throw new IllegalArgumentException(e);
383 } catch (BindingException e) {
384 throw new IllegalArgumentException(e);
390 public void claimLiteral(Resource resource, Resource predicate,
391 Resource inverse, Resource type, Object value, Binding binding)
392 throws BindingException, ManyObjectsForFunctionalRelationException,
395 Statement valueStatement = (resource instanceof InternalResource) ? null : getPossibleStatement(resource, predicate);
397 if(valueStatement != null && resource.equals(valueStatement.getSubject())) {
399 claimValue(valueStatement.getObject(), value, binding);
403 Resource valueResource = newResource();
404 claim(valueResource, b.InstanceOf, null, type);
405 claim(resource, predicate, inverse, valueResource);
406 claimValue(valueResource, value, binding);
413 public void claimLiteral(Resource resource, Resource predicate,
414 Resource type, Object value) throws BindingException,
415 ManyObjectsForFunctionalRelationException, ServiceException {
418 Binding b = Bindings.getBinding(value.getClass());
419 claimLiteral(resource, predicate, type, value, b);
420 } catch (BindingConstructionException e) {
421 throw new IllegalArgumentException(e);
422 } catch (BindingException e) {
423 throw new IllegalArgumentException(e);
429 public void claimLiteral(Resource resource, Resource predicate,
430 Resource type, Object value, Binding binding)
431 throws BindingException, ManyObjectsForFunctionalRelationException,
435 Resource inverse = getSingleObject(predicate, b.InverseOf);
436 claimLiteral(resource, predicate, inverse, type, value, binding);
437 } catch (NoSingleResultException e) {
438 throw new ServiceException(e);
444 public void deny(Resource subject) throws ServiceException {
445 assert(subject != null);
446 if(!(subject instanceof InternalResource)) {
448 for (Statement statement : getStatements(subject, b.IsWeaklyRelatedTo)) {
451 } catch (ManyObjectsForFunctionalRelationException e) {
452 throw new ServiceException(e);
458 public void deny(Resource subject, Resource predicate)
459 throws ServiceException {
460 assert(subject != null);
461 if(!(subject instanceof InternalResource)) {
462 for (Resource object : getObjects(subject, predicate)) {
463 deny(subject, predicate, object);
469 public void deny(Resource subject, Resource predicate, Resource object) throws ServiceException {
470 denyStatement(subject, predicate, object);
474 public void denyStatement(Resource subject, Resource predicate, Resource object) throws ServiceException {
475 deny(subject, predicate, getPossibleInverse(predicate), object);
479 public void deny(Statement statement) throws ServiceException {
480 Resource predicate = statement.getPredicate();
481 deny(statement.getSubject(), predicate, getPossibleInverse(predicate), statement.getObject());
485 public void denyValue(Resource resource, Resource predicate)
486 throws ManyObjectsForFunctionalRelationException, ServiceException {
487 assert(resource != null);
488 assert(predicate != null);
490 if(!(resource instanceof InternalResource)) {
491 Statement valueStatement = getPossibleStatement(resource, predicate);
493 if (valueStatement != null && !valueStatement.isAsserted(resource)) {
494 Resource value = valueStatement.getObject();
495 deny(valueStatement);
502 public Resource newResource() throws ServiceException {
503 if(writeState.defaultClusterSet != null) return newResource(writeState.defaultClusterSet);
504 else return newResource(writeState.defaultCluster);
508 public Resource newResource(long clusterId) throws ServiceException {
509 int id = writeState.idToResource.size();
510 InternalResource ret = new InternalResource(id, clusterId);
511 writeState.idToResource.add(ret);
516 public Resource newResource(Resource clusterSet) throws ServiceException {
518 if ((clusterSet instanceof InternalResource)) {
519 if(!writeState.clusterSets.contains(clusterSet))
520 throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);
522 WriteSupport ws = session.getService(WriteSupport.class);
523 if (!ws.hasClusterSet(null, clusterSet))
524 if(!writeState.clusterSetsForExistingResources.contains(clusterSet))
525 throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);
528 int id = writeState.idToResource.size();
529 InternalResource ret = new InternalResource(id, clusterSet);
530 writeState.idToResource.add(ret);
535 public void newClusterSet(Resource clusterSet) throws ServiceException {
537 System.out.println("new cluster set=" + clusterSet);
538 boolean existingResource = !(clusterSet instanceof InternalResource);
539 if (existingResource) {
540 WriteSupport ws = session.getService(WriteSupport.class);
541 if (ws.hasClusterSet(null, clusterSet))
542 throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
543 writeState.clusterSetsForExistingResources.add(clusterSet);
545 if(!writeState.clusterSets.add(clusterSet))
546 throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
550 public Resource setClusterSet4NewResource(Resource clusterSet)
551 throws ServiceException {
552 Resource existing = writeState.defaultClusterSet;
553 writeState.defaultClusterSet = clusterSet;
558 public void claim(Resource subject, Resource predicate, Resource inverse,
559 Resource object) throws ServiceException {
560 assert(subject != null);
561 assert(predicate != null);
562 assert(object != null);
564 if(inverse != null && !(subject.equals(object) && inverse.equals(predicate))) {
566 writeInt(getId(subject));
567 writeInt(getId(predicate));
568 writeInt(getId(inverse));
569 writeInt(getId(object));
571 writeByte(CLAIM_NOINVERSE);
572 writeInt(getId(subject));
573 writeInt(getId(predicate));
574 writeInt(getId(object));
576 } catch(Exception e) {
577 throw new ServiceException(e);
582 public void deny(Resource subject, Resource predicate, Resource inverse,
583 Resource object) throws ServiceException {
584 assert(subject != null);
585 assert(predicate != null);
586 assert(object != null);
589 writeInt(getId(subject));
590 writeInt(getId(predicate));
591 if(inverse != null) writeInt(getId(inverse));
593 writeInt(getId(object));
594 } catch(Exception e) {
595 throw new ServiceException(e);
600 public void deny(Resource subject, Resource predicate, Resource inverse,
601 Resource object, VirtualGraph graph) throws ServiceException {
602 throw new UnsupportedOperationException();
606 public void claimValue(Resource resource, Object value)
607 throws ServiceException {
609 Binding binding = Bindings.getBinding(value.getClass());
610 claimValue(resource, value, binding);
611 } catch (BindingConstructionException e) {
612 throw new ServiceException(e);
616 private OutputStream valueWriter = new OutputStream() {
618 public void write(int b) throws IOException {
624 public void claimValue(Resource resource, Object value, Binding binding)
625 throws ServiceException {
628 writeByte(CLAIM_VALUE_B);
629 writeInt(getId(resource));
630 Serializer serializer = binding.serializer();
631 int size = serializer.getSize(value);
633 serializer.serialize(valueWriter, value);
635 } catch(IOException e) {
636 Logger.defaultLogError(e);
637 throw new ServiceException(e);
642 public void denyValue(Resource resource) throws ServiceException {
643 writeByte(DENY_VALUE);
644 writeInt(getId(resource));
648 public void denyValue(Resource resource, VirtualGraph graph) throws ServiceException {
649 throw new UnsupportedOperationException();
653 public void flushCluster() throws ServiceException {
654 writeState.defaultCluster = newCluster();
658 public void flushCluster(Resource r) throws ServiceException {
659 throw new ServiceException("Operation flushCluster(" + r + " not implemented.");
662 private void writeReset(int size) {
667 if (writeState.tempFile == null) {
668 File workspace = Platform.getLocation().toFile();
669 File temp = new File(workspace, "tempFiles");
670 File base = new File(temp, "delayed");
671 Files.createDirectories(base.toPath());
672 writeState.tempFile = new File(base, UUID.randomUUID().toString());
673 writeState.out = new FileOutputStream(writeState.tempFile);
674 channel = writeState.out.getChannel();
677 for (int got=0;got < size;) {
678 int n = channel.write(bb);
680 Logger.defaultLogError(new Exception("FileChannel.write returned " + n));
685 } catch (IOException e) {
686 Logger.defaultLogError("Failed to write buffer of " + size + " bytes to temporary file " + writeState.tempFile, e);
690 private void reset() {
693 if (channel != null) {
695 for(int got=0; got < BUFFER_SIZE;) {
696 int n = channel.read(bb);
701 } catch (IOException e) {
702 Logger.defaultLogError("FileChannel.read failed", e);
707 private void writeInt(int i) {
708 if(byteIndex < (BUFFER_SIZE-4)) {
709 bytes[byteIndex++] = (byte)(i&0xff);
710 bytes[byteIndex++] = (byte)((i>>>8)&0xff);
711 bytes[byteIndex++] = (byte)((i>>>16)&0xff);
712 bytes[byteIndex++] = (byte)((i>>>24)&0xff);
713 if (byteIndex == BUFFER_SIZE)
714 writeReset(BUFFER_SIZE);
716 int has = BUFFER_SIZE-byteIndex;
717 if(has == 0) writeReset(BUFFER_SIZE);
718 bytes[byteIndex++] = (byte)(i&0xff);
719 if(has == 1) writeReset(BUFFER_SIZE);
720 bytes[byteIndex++] = (byte)((i>>>8)&0xff);
721 if(has == 2) writeReset(BUFFER_SIZE);
722 bytes[byteIndex++] = (byte)((i>>>16)&0xff);
723 if(has == 3) writeReset(BUFFER_SIZE);
724 bytes[byteIndex++] = (byte)((i>>>24)&0xff);
725 if(has == 4) writeReset(BUFFER_SIZE);
729 private int readInt() {
730 if(byteIndex < (BUFFER_SIZE-4)) {
732 ((bytes[byteIndex++] & 0xff) |
733 ((bytes[byteIndex++] & 0xff)<<8) |
734 ((bytes[byteIndex++] & 0xff)<<16) |
735 ((bytes[byteIndex++] & 0xff)<<24));
738 int has = BUFFER_SIZE-byteIndex;
740 if(has == 0) reset();
741 result = (int)(bytes[byteIndex++] & 0xff);
742 if(has == 1) reset();
743 result |= (int)((bytes[byteIndex++] & 0xff) <<8);
744 if(has == 2) reset();
745 result |= (int)((bytes[byteIndex++] & 0xff) <<16);
746 if(has == 3) reset();
747 result |= (int)((bytes[byteIndex++] & 0xff) <<24);
748 if(has == 4) reset();
753 private byte readByte() {
754 byte result = bytes[byteIndex++];
755 if(byteIndex == BUFFER_SIZE) reset();
759 private void writeByte(int b) {
760 bytes[byteIndex++] = (byte)b;
761 if(byteIndex == BUFFER_SIZE) writeReset(BUFFER_SIZE);
764 private void writeBytes(byte[] data) {
765 int has = BUFFER_SIZE-byteIndex;
766 int amount = data.length;
768 System.arraycopy(data, 0, bytes, byteIndex, amount);
771 System.arraycopy(data, 0, bytes, byteIndex, has);
772 writeReset(BUFFER_SIZE);
773 ByteBuffer bb2 = ByteBuffer.wrap(data);
777 } catch (IOException e) {
778 Logger.defaultLogError("FileChannel.write failed", e);
783 public byte[] readBytes(byte[] result, int amount) {
784 if(result == null) result = new byte[amount];
785 int has = BUFFER_SIZE-byteIndex;
787 System.arraycopy(bytes, byteIndex, result, 0, amount);
790 System.arraycopy(bytes, byteIndex, result, 0, has);
791 ByteBuffer bb2 = ByteBuffer.wrap(result);
793 for(int got=has;got<amount;)
795 got += channel.read(bb2);
797 // End-of-stream, why log this?
800 } catch (IOException e) {
801 Logger.defaultLogError("FileChannel.read failed", e);
808 public void commit(final WriteOnlyGraph w, final WriteTraits traits) throws ServiceException {
809 writeState.bindingToId = null;
810 writeState.externalToId = null;
813 if (writeState.out != null) {
814 // Flush current buffer to file only if backing file has already
815 // been taken into use.
817 writeReset(byteIndex);
819 try (OutputStream out = writeState.out) {
820 channel.force(false);
821 } catch (IOException e) {
822 throw new ServiceException(e);
824 writeState.out = null;
828 writeState.in = new FileInputStream(writeState.tempFile);
829 channel = writeState.in.getChannel();
830 } catch (IOException e) {
831 throw new ServiceException(e);
835 w.getMetadata().putAll(metadata);
837 TransferableGraphSupport tgs = w.getService(TransferableGraphSupport.class);
839 // First create all resources defined by clusterId
840 MapList<Long,InternalResource> clusterAssignment = new MapList<Long,InternalResource>();
841 for(Resource r : writeState.idToResource) {
842 if(r instanceof InternalResource) {
843 InternalResource ir = (InternalResource)r;
844 if(ir.clusterId < 0) {
846 System.out.println("ASSIGN CLUSTER " + ir + " => " + ir.clusterId);
847 clusterAssignment.add(ir.clusterId, ir);
848 } else if(ir.clusterId > 0) {
850 System.out.println("-CREATED RESOURCE WITH EXISTING CLUSTER ID: " + ir);
851 ir.resource = w.newResource(ir.clusterId);
852 writeState.idToResource.set(ir.id, ir.resource);
853 if (writeState.clusterSets.contains(ir)) {
854 w.newClusterSet(ir.resource);
856 System.out.println("--CREATED NEW INTERNAL RESOURCE CLUSTER SET: " + ir.resource);
863 for(Long clusterKey : clusterAssignment.getKeys()) {
865 System.out.println("CREATE LOGICAL CLUSTER: " + clusterKey);
867 for(InternalResource ir : clusterAssignment.getValuesUnsafe(clusterKey)) {
869 System.out.println("-CREATED RESOURCE: " + ir);
870 ir.resource = w.newResource();
871 writeState.idToResource.set(ir.id, ir.resource);
872 if (writeState.clusterSets.contains(ir)) {
873 w.newClusterSet(ir.resource);
875 System.out.println("--CREATED NEW INTERNAL RESOURCE CLUSTER SET: " + ir.resource);
880 // Create cluster sets for all existing resources (not InternalResource)
881 // before proceeding to create resources.
882 for(Resource existingResource : writeState.clusterSetsForExistingResources) {
883 w.newClusterSet(existingResource);
885 System.out.println("CREATED NEW CLUSTER SET: " + existingResource);
888 // Then create all resources defined by cluster set
889 for(Resource r : writeState.idToResource) {
890 if(r instanceof InternalResource) {
891 InternalResource ir = (InternalResource)r;
892 Resource clusterSet = ir.clusterSet;
894 if (clusterSet != null) {
896 System.out.println("NEW RESOURCE " + ir + " for cluster set " + clusterSet);
897 if(clusterSet instanceof InternalResource) {
898 ir.resource = w.newResource(((InternalResource)clusterSet).resource);
900 ir.resource = w.newResource(clusterSet);
903 System.out.println(" => " + ir.resource);
904 writeState.idToResource.set(ir.id, ir.resource);
905 if(writeState.clusterSets.contains(ir)) {
907 System.out.println(" ==> NEW CLUSTER SET");
908 w.newClusterSet(ir.resource);
916 bb.limit(BUFFER_SIZE);
920 byte method = readByte();
924 System.out.println("Resources: " + writeState.idToResource.size());
925 System.out.println("Statements: " + writeState.statementCount);
926 System.out.println("Values: " + writeState.valueCount);
927 System.out.println("Files: " + writeState.fileCount);
928 System.out.println("Clusters: " + writeState.clusterCount);
933 writeState.statementCount += 2;
934 Resource subject = getResource(readInt());
935 Resource predicate = getResource(readInt());
936 Resource inverse = getResource(readInt());
937 Resource object = getResource(readInt());
938 w.claim(subject, predicate, inverse, object);
940 case CLAIM_NOINVERSE: {
941 ++writeState.statementCount;
942 Resource subject = getResource(readInt());
943 Resource predicate = getResource(readInt());
944 Resource object = getResource(readInt());
945 w.claim(subject, predicate, null, object);
948 Resource subject = getResource(readInt());
949 Resource predicate = getResource(readInt());
951 Resource inverse = null;
952 if(inv > 0) inverse = getResource(inv);
953 Resource object = getResource(readInt());
954 if(!subject.isPersistent() || !object.isPersistent()) {
955 VirtualGraph statementProvider1 = processor.getProvider(subject, predicate, object);
957 VirtualGraph statementProvider2 = processor.getProvider(object, inverse, subject);
958 if(statementProvider2 != null)
959 w.deny(object, inverse, null, subject, statementProvider2);
961 if(statementProvider1 != null)
962 w.deny(subject, predicate, null, object, statementProvider1);
964 w.deny(subject, predicate, inverse, object, null);
968 Resource subject = getResource(readInt());
969 if(!subject.isPersistent()) {
970 VirtualGraph provider = processor.getValueProvider(subject);
972 w.denyValue(subject, provider);
974 w.denyValue(subject);
977 case CLAIM_VALUE_B: {
978 ++writeState.valueCount;
979 Resource resource = getResource(readInt());
981 tgs.setValue(w, resource, null, this, len);
983 case COMMIT_AND_CONTINUE: {
984 XSupport xs = w.getService(XSupport.class);
985 xs.commitAndContinue(w, traits);
989 } catch(Exception e) {
990 if(e instanceof ServiceException)
991 throw (ServiceException)e;
993 throw new ServiceException(e);
996 if (writeState.in != null) {
997 try (InputStream in = writeState.in) {
998 } catch (IOException e) {
999 throw new ServiceException(e);
1001 writeState.in = null;
1002 writeState.tempFile.delete();
1008 private Resource getType(Object value) {
1009 Class<?> clazz = value.getClass();
1011 clazz == Float.class ? b.Float
1012 : clazz == Double.class ? b.Double
1013 : clazz == Integer.class ? b.Integer
1014 : clazz == String.class ? b.String
1015 : clazz == Boolean.class ? b.Boolean
1016 : clazz == Byte.class ? b.Byte
1017 : clazz == Long.class ? b.Long
1018 : clazz == float[].class ? b.FloatArray
1019 : clazz == double[].class ? b.DoubleArray
1020 : clazz == int[].class ? b.IntegerArray
1021 : clazz == String[].class ? b.StringArray
1022 : clazz == boolean[].class ? b.BooleanArray
1023 : clazz == byte[].class ? b.ByteArray
1024 : clazz == long[].class ? b.LongArray
1030 public long newCluster() {
1031 return -1 - (++writeState.clusterCount);
1034 public long getDefaultCluster() {
1035 return writeState.defaultCluster;
1038 public void setDefaultCluster(long cluster) {
1039 writeState.defaultCluster = cluster;
1043 public void syncRequest(final DelayedWrite request) throws DatabaseException {
1047 final DelayedWriteGraph dwg = new DelayedWriteGraph(this);
1048 request.perform(dwg);
1050 syncRequest(new WriteOnlyRequest() {
1053 public void perform(WriteOnlyGraph graph) throws DatabaseException {
1054 dwg.commit(graph, request);
1059 } catch (DatabaseException e) {
1063 } catch (Throwable e) {
1065 throw new DatabaseException(e);
1073 public void syncRequest(WriteOnly request) throws DatabaseException {
1075 Resource defaultClusterSet = setClusterSet4NewResource(null);
1078 WriteSupport ws = session.getService(WriteSupport.class);
1079 ws.performWriteRequest(this, request);
1080 } catch (DatabaseException e) {
1082 } catch (Throwable t) {
1083 throw new DatabaseException(t);
1085 setClusterSet4NewResource(defaultClusterSet);
1090 @SuppressWarnings("unchecked")
1092 public <T> T getService(Class<T> api) {
1094 if(ClusteringSupport.class == api) {
1096 final ClusteringSupport support = (ClusteringSupport)super.getService(api);
1098 return (T)new ClusteringSupport() {
1101 public Resource getResourceByIndexAndCluster(int resourceIndex, long clusterId)
1102 throws DatabaseException, ResourceNotFoundException {
1103 return support.getResourceByIndexAndCluster(resourceIndex, clusterId);
1107 public Resource getResourceByKey(int resourceKey) throws ResourceNotFoundException {
1108 return support.getResourceByKey(resourceKey);
1112 public int getNumberOfResources(long clusterId)
1113 throws DatabaseException {
1114 return support.getNumberOfResources(clusterId);
1118 public long getCluster(Resource r) {
1119 return support.getCluster(r);
1123 public long createCluster() {
1124 return newCluster();
1128 public boolean isClusterSet(Resource r) throws DatabaseException {
1129 return support.isClusterSet(r);
1133 public Resource getClusterSetOfCluster(Resource r) throws DatabaseException {
1134 return support.getClusterSetOfCluster(r);
1138 public Resource getClusterSetOfCluster(long cluster) throws DatabaseException {
1139 return support.getClusterSetOfCluster(cluster);
1144 } else if (TransferableGraphSupport.class == api) {
1146 final TransferableGraphSupport parentSupport = session.getService(TransferableGraphSupport.class);
1148 return (T)new TransferableGraphSupport() {
1151 public void setValue(WriteOnlyGraph graph, Resource resource, VirtualGraph provider, byte[] raw) {
1152 writeByte(CLAIM_VALUE_B);
1153 writeInt(getId(resource));
1154 writeInt(raw.length);
1156 writeInt(getBindingId(PASSTHROUGH));
1160 public void setValue(WriteOnlyGraph graph, Resource resource, VirtualGraph provider, ByteReader reader, int amount)
1161 throws DatabaseException {
1162 writeByte(CLAIM_VALUE_B);
1163 writeInt(getId(resource));
1165 writeBytes(reader.readBytes(null, amount));
1166 writeInt(getBindingId(PASSTHROUGH));
1170 public byte[] getValue(ReadGraph graph, Resource resource) {
1171 return parentSupport.getValue(graph, resource);
1175 public InputStream getValueStream(ReadGraph graph, Resource resource) {
1176 return parentSupport.getValueStream(graph, resource);
1183 return super.getService(api);
1188 public <T> void addMetadata(Metadata data) throws ServiceException {
1189 MetadataUtils.addMetadata(session, metadata, data);
1192 public void addCommitAndContinue() {
1193 writeByte(COMMIT_AND_CONTINUE);
1197 public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1198 return MetadataUtils.getMetadata(session, metadata, clazz);
1202 public TreeMap<String, byte[]> getMetadata() {
1206 public <T> T syncRequest(WriteResult<T> request) throws DatabaseException {
1208 return request.perform(this);
1213 public VirtualGraph getProvider() {
1218 public void clearUndoList(WriteTraits writeTraits) {
1219 WriteSupport ws = session.getService(WriteSupport.class);
1221 ws.clearUndoList(writeTraits);
1225 public void markUndoPoint() {
1226 // TODO Auto-generated method stub
1231 public <T> T getPossibleRelatedValue(final Resource subject, final Resource relation, final Binding binding)
1232 throws ManyObjectsForFunctionalRelationException, BindingException, ServiceException {
1233 if(!(subject instanceof InternalResource)) {
1234 return super.getPossibleRelatedValue(subject, relation, binding);
1241 final public Resource getPossibleObject(final Resource subject, final Resource relation)
1242 throws ManyObjectsForFunctionalRelationException, ServiceException {
1243 if(!(subject instanceof InternalResource)) {
1244 return super.getPossibleObject(subject, relation);
1250 public void close() {
1251 if (writeState.out != null) {
1252 try (OutputStream out = writeState.out) {
1253 } catch (IOException e) {
1254 Logger.defaultLogError("Failed to close delayed write graph temporary commit output stream", e);
1256 writeState.out = null;
1259 if (writeState.in != null) {
1260 try (InputStream in = writeState.in) {
1261 } catch (IOException e) {
1262 Logger.defaultLogError("Failed to close delayed write graph temporary commit input stream", e);
1264 writeState.in = null;
1267 if (writeState.tempFile != null) {
1268 writeState.tempFile.delete();
1269 writeState.tempFile = null;