1 /*******************************************************************************
2 * Copyright (c) 2007, 2010 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * VTT Technical Research Centre of Finland - initial API and implementation
11 *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
14 import java.io.ByteArrayInputStream;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.lang.ref.SoftReference;
18 import java.util.ArrayList;
19 import java.util.Collection;
20 import java.util.HashSet;
21 import java.util.Iterator;
23 import java.util.TreeMap;
24 import java.util.Vector;
25 import java.util.concurrent.CopyOnWriteArrayList;
27 import org.simantics.databoard.Bindings;
28 import org.simantics.databoard.binding.impl.TreeMapBinding;
29 import org.simantics.databoard.serialization.Serializer;
30 import org.simantics.db.ChangeSetIdentifier;
31 import org.simantics.db.Database;
32 import org.simantics.db.Database.Session.ChangeSetData;
33 import org.simantics.db.Database.Session.ChangeSetIds;
34 import org.simantics.db.Database.Session.ClusterChanges;
35 import org.simantics.db.Database.Session.ClusterIds;
36 import org.simantics.db.Database.Session.Refresh;
37 import org.simantics.db.Operation;
38 import org.simantics.db.SessionReference;
39 import org.simantics.db.common.UndoContextEx;
40 import org.simantics.db.common.utils.Logger;
41 import org.simantics.db.exception.DatabaseException;
42 import org.simantics.db.exception.InternalException;
43 import org.simantics.db.impl.ClusterI;
44 import org.simantics.db.impl.ResourceImpl;
45 import org.simantics.db.service.ClusterUID;
46 import org.simantics.db.service.ExternalOperation;
47 import org.simantics.db.service.ManagementSupport.ChangeSetListener;
48 import org.simantics.scl.runtime.function.Function1;
49 import org.simantics.scl.runtime.function.FunctionImpl1;
50 import org.simantics.utils.threads.ThreadUtils;
52 import gnu.trove.iterator.TLongObjectIterator;
53 import gnu.trove.map.hash.THashMap;
54 import gnu.trove.map.hash.TLongObjectHashMap;
56 final class ChangeSetIdentifierImpl implements ChangeSetIdentifier {
57 private final long id;
58 private final GraphSession graphSession;
59 private Map<String, byte[]> metadata;
60 ChangeSetIdentifierImpl(long id, GraphSession graphSession) {
62 this.graphSession = graphSession;
65 public final long getId() {
68 public final void setMetadata(Map<String, byte[]> metadata) {
69 this.metadata = metadata;
71 public final byte[] getContext()
72 throws DatabaseException {
73 if (null == graphSession)
75 return graphSession.getChangeSetContext(id);
78 static Serializer METADATA_SERIALIZER =
79 Bindings.getSerializerUnchecked( new TreeMapBinding(Bindings.STRING,
80 Bindings.BYTE_ARRAY));
82 @SuppressWarnings("unchecked")
83 public Map<String, byte[]> getMetadata() {
85 if (metadata == null) {
86 byte[] data = getContext();
87 if (null != data && data.length > 0) {
88 metadata = (TreeMap<String, byte[]>) METADATA_SERIALIZER.deserialize(data);
91 } catch (DatabaseException e) {
92 Logger.defaultLogError(e);
94 } catch (IOException e) {
95 Logger.defaultLogError(e);
101 static long[] changeSetIds2ints(Collection<ChangeSetIdentifier> changeSetIds) {
102 if (null == changeSetIds)
104 long[] t = new long[changeSetIds.size()];
106 for (ChangeSetIdentifier id : changeSetIds)
107 t[++i] = ((ChangeSetIdentifierImpl) id).getId();
110 static long[] operations2ints(Collection<Operation> ops) {
113 long[] t = new long[ops.size()];
115 for (Operation o : ops)
116 t[++i] = o.getCSId();
119 static Collection<ChangeSetIdentifier> longs2changeSetIds(GraphSession gs, long firstChangeSetIds, int count) {
120 ArrayList<ChangeSetIdentifier> t = new ArrayList<ChangeSetIdentifier>();
121 t.ensureCapacity(count);
122 for (int i=0; i<count; ++i)
123 t.add(new ChangeSetIdentifierImpl(firstChangeSetIds + i, gs));
126 static Collection<ChangeSetIdentifier> longs2changeSetIds(GraphSession gs, long min, long max) {
127 ArrayList<ChangeSetIdentifier> t = new ArrayList<ChangeSetIdentifier>();
130 long length = max - min + 1;
131 t.ensureCapacity((int)length);
132 for (int i=0; i<length; ++i) {
133 t.add(new ChangeSetIdentifierImpl(min+i, gs));
139 public abstract class GraphSession {
140 final protected boolean DEBUG = false;
141 final protected boolean VERBOSE = false;
142 protected Listener listener;
143 protected SessionImplSocket session;
144 protected final Database.Session dbSession;
145 private SessionReference sessionReference;
146 private TLongObjectHashMap<ClusterI> clusterMap = new TLongObjectHashMap<ClusterI>();
147 protected THashMap<String, BuiltinData> builtinMap = null;
148 private long firstChangeSetId = 0;
149 protected SynchronizeContextI synchronizeContext;
151 final UndoContextEx undoContext = new UndoContextEx("GraphSession");
152 final CopyOnWriteArrayList<ChangeSetListener> changeSetListeners = new CopyOnWriteArrayList<ChangeSetListener>();
153 private long lastChangeSetId = 0;
154 protected MetadataCache metadataCache = new MetadataCache();
156 public GraphSession(SessionImplSocket sessionImpl, SessionReference sessionReference, Database.Session dbSession) {
157 this.dbSession = dbSession;
158 if (null == dbSession)
159 throw new RuntimeException("Failed to initialize GraphSession. Database.Session can not be null.");
160 // if (!(sessionReference instanceof ProCoreSessionReference))
161 // throw new RuntimeException("Failed to initialize GraphSession. SessionReference must be instance of ProCoreSessionReference.");
162 this.sessionReference = sessionReference;
163 this.session = sessionImpl;
165 void addChangeSetListener(ChangeSetListener csl) {
166 changeSetListeners.add(csl);
168 void removeChangeSetListener(ChangeSetListener csl) {
169 changeSetListeners.remove(csl);
171 SessionReference getSessionReference() {
172 return sessionReference;
175 public void open() throws InternalException {
179 public void close() throws InternalException {
183 public long getSessionId() {
184 return sessionReference.getSessionId();
187 // public String getHost() {
188 // return sessionReference.serverReference.socketAddress.getHostName();
191 // public int getPort() {
192 // return sessionReference.serverReference.socketAddress.getPort();
195 long getFirstChangeSetId() {
196 return firstChangeSetId;
199 String getClusterFileName(long clusterId) {
200 String fileName = "cluster" + clusterId + ".dat";
204 int computeClusterMemoryUse() {
207 TLongObjectIterator<ClusterI> i = clusterMap.iterator();
208 while (i.hasNext()) {
210 ClusterI c = i.value();
212 size += c.getUsedSpace();
213 } catch (DatabaseException e) {
214 Logger.defaultLogInfo("GetUsedSpace faile for cluster " + c);
222 void printClusterSize() {
227 TLongObjectIterator<ClusterI> i = clusterMap.iterator();
228 while (i.hasNext()) {
230 ClusterI c = i.value();
233 size += c.getUsedSpace();
234 } catch (DatabaseException e) {
235 Logger.defaultLogInfo("GetUsedSpace faile for cluster " + c);
242 loadedClusters.clear();
243 System.out.println("" + size / (1024 * 1024) + "M in " + amount + " clusters.");
247 private HashSet<Long> loadedClusters = new HashSet<Long>();
249 ClusterI freeClusterFromCacheAndFreeClusterMemory(long clusterId) {
250 ClusterI c = clusterMap.remove(clusterId);
256 private void debugPrint(UpdateClusterFunction f) {
257 for (int i = 0; i < f.operation.length; ++i) {
258 System.out.println("op=" + f.operation[i]);
264 public boolean updateCluster(UpdateClusterFunction f) {
265 session.clusterStream.markDirty();
269 System.out.println("update cluster");
273 // long start = System.nanoTime();
274 dbSession.updateCluster(f.operation);
275 // long duration = System.nanoTime() - start;
276 // total += duration;
277 // System.err.println("graphClient.call = " + 1e-9*duration + " " + 1e-9*total);
278 // System.out.println("GraphSession.updateCluster done.");
280 } catch (Throwable e) {
285 public byte[] getChangeSetContext(long id)
286 throws DatabaseException {
287 byte[] data = metadataCache.get(id);
290 if (id < getServerInformation().firstChangeSetId) {
291 Logger.defaultLogInfo("Asking purged change set metadata. uid=" + id);
294 return dbSession.getChangeSetMetadata(id);
297 public StatementImpl getStatement(ResourceImpl s, ResourceImpl p, ResourceImpl o) {
298 return new StatementImplOld(s, p, o);
300 public long newClusterId() throws DatabaseException {
301 long id = dbSession.reserveIds(1);
302 if (id <= ClusterUID.Builtin.second)
303 return newClusterId();
311 // public ClusterI getClusterImpl(ClusterUID clusterUID, int clusterKey)
312 // throws DatabaseException {
313 // assert(ClusterUID.isLegal(clusterUID));
315 // if(Development.DEVELOPMENT) {
316 // if(Development.isTrue(DevelopmentKeys.READGRAPH_COUNT)) {
317 // ReadGraphImpl.counters.adjustOrPutValue("load " + clusterUID + " " + clusterKey, 1, 1);
321 //// ITask task = ThreadLogger.getInstance().begin("Load cluster " + clusterId);
323 // System.out.println("DEBUG: loading cluster=" + clusterUID);
324 //// System.out.println("getClusterImpl " + clusterId);
325 // long start = System.nanoTime();
326 // Cluster t = dbSession.getCluster(clusterUID.asBytes());
327 // long duration = System.nanoTime() - start;
329 // //System.out.println("loaded in " + 1e-9*duration + " total " + 1e-9*load);
330 //// if (Constants.NullClusterId == t.clusterId) {
331 //// throw new DatabaseException("Cluster " + t.clusterId + " not in server.");
334 // ByteBuffer deflated = t.getDeflated();
335 // start = System.nanoTime();
336 // Object[] arrays = clusterDecompressor.inflateCluster(t.getInflateSize(), deflated);
337 // duration = System.nanoTime() - start;
338 // inflate += duration;
339 // //System.out.println("inflated in " + 1e-9*duration + " total " + 1e-9*inflate);
340 // long[] longs = (long[]) arrays[0];
341 // int[] ints = (int[]) arrays[1];
342 // byte[] bytes = (byte[]) arrays[2];
344 //// System.out.println("Got cluster " + clusterId + " from core.");
345 // ClusterImpl cluster = ClusterImpl.make(longs, ints, bytes, session.clusterTranslator, clusterKey);
346 //// System.out.println("cluster[ " + clusterId + "].size = " + cluster.getUsedSpace(-1));
351 ClusterI getCluster(long clusterId) {
352 return clusterMap.get(clusterId);
355 ClusterI getClusterOrThrow(long clusterId) throws IllegalArgumentException {
356 ClusterI c = this.getCluster(clusterId);
358 throw new IllegalArgumentException("Cluster " + clusterId + " does not exist.");
362 public String execute(String command) throws DatabaseException {
363 return dbSession.execute(command);
366 public Collection<ChangeSetIdentifier> getChangeSets()
367 throws DatabaseException {
368 ChangeSetIds t = dbSession.getChangeSetIds();
369 return ChangeSetIdentifierImpl.longs2changeSetIds(this, t.getFirstChangeSetId(), t.getCount());
371 public long getLastChangeSetId() {
372 return lastChangeSetId;
374 public Collection<ChangeSetIdentifier> getChangeSets(long from, long to, long upperLimit)
375 throws DatabaseException {
377 throw new DatabaseException("Illegal argument: from=" + from + " > to=" + to + ".");
378 long min = Math.max(from, 1);
379 long max = Math.min(to, upperLimit);
380 return ChangeSetIdentifierImpl.longs2changeSetIds(this, min, max);
383 public boolean getChangeSets(long minCS, long maxCS, SynchronizeContextI context)
384 throws DatabaseException {
386 synchronizeContext = context;
387 ChangeSetData t = dbSession.getChangeSetData(minCS, maxCS, context);
389 } catch (Throwable e) {
392 if (e instanceof DatabaseException)
393 throw (DatabaseException) e;
395 throw new DatabaseException("GetChangeSetData call to server failed.");
397 synchronizeContext = null;
401 public static void forExternals(Collection<Operation> operations, Function1<ExternalOperation, Boolean> fn) {
402 for(Operation o : operations) {
403 for(ExternalOperation e : o.getExternalOperations()) {
404 if(!e.isDisposed()) fn.apply(e);
409 public boolean undo(Collection<Operation> changeSetIds, SynchronizeContextI context)
410 throws DatabaseException {
413 forExternals(changeSetIds, new FunctionImpl1<ExternalOperation, Boolean>() {
415 public Boolean apply(ExternalOperation op) {
421 long[] cids = ChangeSetIdentifierImpl.operations2ints(changeSetIds);
422 synchronizeContext = context;
425 System.out.println("DEBUG: Undo cs=" + id);
426 return dbSession.undo(cids, context);
427 } catch (Throwable e) {
430 if (e instanceof DatabaseException)
431 throw (DatabaseException) e;
433 throw new DatabaseException("Undo call to server failed.", e);
435 synchronizeContext = null;
439 public ClusterUID[] getRefresh2(long csid) throws DatabaseException {
440 Refresh t = dbSession.getRefresh(csid);
442 System.err.println("getRefresh2 old cs=" + csid + " head cs=" + t.getHeadChangeSetId() + " chanaged clusters=" + t.getFirst().length);
443 ClusterUID[] clusters = new ClusterUID[t.getFirst().length];
444 for (int i=0; i<t.getFirst().length; ++i)
445 clusters[i] = new ClusterUID(t.getFirst()[i], t.getSecond()[i]);
449 public ClusterChanges getClusterChanges(ClusterUID cluster, long csid) throws DatabaseException {
450 ClusterChanges t = dbSession.getClusterChanges(csid, cluster.asBytes());
452 System.err.println("getClusterChanges old cs=" + csid + " head cs=" + t.getHeadChangeSetId() + " cluster=" + cluster + " statements=" + t.getPredicateIndex().length + " values=" + t.getValueIndex().length);
455 public long getCluster(int id) {
456 return session.getCluster(id);
459 private static class ResourceSegment {
460 public long valueSize;
464 ResourceSegment(long valueSize, byte[] bytes) {
465 this.valueSize = valueSize;
469 public ResourceSegment getResourceSegment(int resourceIndex, ClusterUID clusterUID, long offset, short size)
470 throws DatabaseException {
472 System.out.println("DEBUG: getResourceSegment ri=" + resourceIndex + " cid=" + clusterUID + " offset=" + offset + " size=" + size);
473 org.simantics.db.Database.Session.ResourceSegment t = dbSession.getResourceSegment(clusterUID.asBytes(), resourceIndex, offset, size);
474 return new ResourceSegment(t.getValueSize(), t.getSegment());
477 public byte[] getResourceValue(int resourceIndex, ClusterUID clusterUID)
478 throws DatabaseException {
479 return getResourceValue(resourceIndex, clusterUID, 0, 0);
482 public InputStream getResourceValueStream(final int resourceIndex, final ClusterUID clusterUID, long offset, int length) throws DatabaseException {
485 System.out.println("DEBUG: getResourceFile ri=" + resourceIndex + " cid=" + clusterUID + " off=" + offset + " len=" + length);
486 final int IMAX = 0xFFFF;
487 short slen = (short)Math.min(length != 0 ? length : IMAX, IMAX);
488 final ResourceSegment s = getResourceSegment(resourceIndex, clusterUID, offset, slen);
490 throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +
491 " cluster=" + clusterUID + " off=" + offset + " len=" + length + " (1).");
492 int ilen = (int)slen & 0xFFFF;
493 assert(s.bytes.length <= ilen);
495 if (s.valueSize > Integer.MAX_VALUE)
496 throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +
497 " cluster=" + clusterUID + " off=" + offset + " len=" + length +
498 ". Value size=" + s.valueSize + " (2).");
499 length = (int)s.valueSize;
501 long rSize = s.valueSize - offset;
503 throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +
504 " cluster=" + clusterUID + " off=" + offset + " len=" + length +
505 ". Value size=" + s.valueSize + " (3).");
506 else if (length <= IMAX)
507 return new ByteArrayInputStream(s.bytes);
509 final int finalLength = length;
511 return new InputStream() {
513 int left = finalLength;
514 long valueOffset = 0;
516 ResourceSegment _s = s;
519 public int read() throws IOException {
521 if(left <= 0) return -1;
523 if(offset == _s.bytes.length) {
524 short slen = (short)Math.min(left, IMAX);
525 valueOffset += _s.bytes.length;
527 _s = getResourceSegment(resourceIndex, clusterUID, valueOffset, slen);
528 } catch (DatabaseException e) {
529 throw new IOException(e);
535 int result = _s.bytes[offset++];
536 if(result < 0) result += 256;
545 public byte[] getResourceValue(int resourceIndex, ClusterUID clusterUID, long offset, int length)
546 throws DatabaseException {
548 System.out.println("DEBUG: getResourceFile ri=" + resourceIndex + " cid=" + clusterUID + " off=" + offset + " len=" + length);
549 final int IMAX = 0xFFFF;
550 short slen = (short)Math.min(length != 0 ? length : IMAX, IMAX);
551 ResourceSegment s = getResourceSegment(resourceIndex, clusterUID, offset, slen);
552 final long VALUE_SIZE = s.valueSize;
554 throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +
555 " cluster=" + clusterUID + " off=" + offset + " len=" + length + " (1).");
556 int ilen = (int)slen & 0xFFFF;
557 assert(s.bytes.length <= ilen);
559 if (s.valueSize > Integer.MAX_VALUE)
560 throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +
561 " cluster=" + clusterUID + " off=" + offset + " len=" + length +
562 ". Value size=" + s.valueSize + " (2).");
563 length = (int)s.valueSize;
565 long rSize = s.valueSize - offset;
567 throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +
568 " cluster=" + clusterUID + " off=" + offset + " len=" + length +
569 ". Value size=" + s.valueSize + " (3).");
570 else if (length <= IMAX)
572 byte[] bytes = new byte[length];
573 int left = (int)length - s.bytes.length;
574 int cur = s.bytes.length;
575 offset += s.bytes.length;
576 System.arraycopy(s.bytes, 0, bytes, 0, cur);
578 slen = (short)Math.min(left, IMAX);
579 s = getResourceSegment(resourceIndex, clusterUID, offset, slen);
580 ilen = (int)slen & 0xFFFF;
581 if (s.valueSize != VALUE_SIZE || s.bytes.length != ilen)
582 throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +
583 " cluster=" + clusterUID + " off=" + offset + " len=" + length +
584 ". Value size=" + s.valueSize + " blen=" + s.bytes.length + " (4).");
585 System.arraycopy(s.bytes, 0, bytes, cur, s.bytes.length);
586 left -= s.bytes.length;
587 cur += s.bytes.length;
588 offset += s.bytes.length;
593 public long getResourceValueSize(int resourceIndex, ClusterUID clusterUID)
594 throws DatabaseException {
596 System.out.println("DEBUG: getResourceFile ri=" + resourceIndex + " cid=" + clusterUID);
597 final short SMAX = (short)0xFFFF;
598 ResourceSegment s = getResourceSegment(resourceIndex, clusterUID, 0, SMAX);
600 throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +
601 " cluster=" + clusterUID + " size=" + s.valueSize);
604 int wait4RequestsLess(int limit)
605 throws DatabaseException {
606 dbSession.execute("");
610 void onChangeSetId(int thread, long csid, boolean refresh);
613 void setListener(Listener listener) {
614 this.listener = listener;
617 protected void updateLastChangeSetId(int thread, final long csid, boolean refresh) {
618 if (null != listener)
619 listener.onChangeSetId(thread, csid, refresh);
620 if (csid > lastChangeSetId) {
621 lastChangeSetId = csid;
622 final Iterator<ChangeSetListener> it = changeSetListeners.iterator();
623 ThreadUtils.getBlockingWorkExecutor().execute(new Runnable() {
626 while (it.hasNext()) {
627 ChangeSetListener l = it.next();
630 } catch (Throwable t) {
631 Logger.defaultLogError(t);
638 protected abstract ServerInformationImpl getServerInformation()
639 throws DatabaseException;
640 public abstract void acceptCommit(long transactionId, long csid, byte[] metadata)
641 throws DatabaseException;
642 public abstract void cancelCommit(long transactionId, long csid, byte[] metadata, SynchronizeContextI context)
643 throws DatabaseException;
644 public abstract void endTransaction(long transactionId, boolean write)
645 throws DatabaseException;
646 public abstract long askWriteTransaction(int thread, long transactionId)
647 throws DatabaseException;
648 public abstract long askReadTransaction(int thread)
649 throws DatabaseException;
650 public abstract void stop()
651 throws DatabaseException;
653 public abstract long reserveIds(int count)
654 throws DatabaseException;
655 static class MetadataCache {
656 private final boolean DEBUG = false;
657 private final int SIZE = 10;
658 private int lastInd; // index of last inserted element
659 private long lastId; // id of last inserted element
660 private int count; // number of inserted elements
661 Vector<byte[]> buffer;
670 private void init() {
672 System.out.println("DEBUG: MetadataCache init");
676 buffer = new Vector<byte[]>();
677 buffer.setSize(SIZE);
679 private boolean incLastInd() {
681 if (lastInd >= SIZE) {
687 synchronized void addNext(long id, byte[] data)
688 throws DatabaseException {
690 System.out.println("DEBUG: MetadataCache add id=" + id);
691 if (lastId != 0 && lastId != id - 1)
692 init(); // Only continuous sequence supported.
694 buffer.set(lastInd, data);
699 synchronized byte[] get(long id) {
701 System.out.println("DEBUG: MetadataCache get id=" + id);
702 if (id > lastId || id <= lastId - count)
704 int ind = lastInd - (int)(lastId - id);
707 byte[] found = buffer.get(ind);
710 System.out.println("DEBUG: MetadataCache found id=" + id);
714 public ClusterUID[] listClusters() throws InternalException {
715 ClusterIds t = dbSession.getClusterIds();
716 long[] first = t.getFirst();
717 long[] second = t.getSecond();
718 int N1 = (null == first) ? 0 : first.length;
719 N1 = Math.min(N1, t.getStatus());
720 int N2 = (null == second) ? 0 : second.length;
721 N2 = Math.min(N1, N2);
722 ClusterUID[] clusters = new ClusterUID[N2];
723 for (int i=0; i<N2; ++i)
724 clusters[i] = new ClusterUID(first[i], second[i]);
727 public boolean rolledback() {
728 return dbSession.rolledback();
734 SoftReference<ResourceImpl> weakResource = null;
735 BuiltinData(long id, long cluster) {
737 this.cluster = cluster;
740 class UpdateClusterFunction {
741 public byte[] operation = null;
742 public UpdateClusterFunction(byte[] operation) {
743 this.operation = operation;