/******************************************************************************* * Copyright (c) 2007, 2010 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation *******************************************************************************/ package fi.vtt.simantics.procore.internal; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.lang.ref.SoftReference; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.TreeMap; import java.util.Vector; import java.util.concurrent.CopyOnWriteArrayList; import org.simantics.databoard.Bindings; import org.simantics.databoard.binding.impl.TreeMapBinding; import org.simantics.databoard.serialization.Serializer; import org.simantics.db.ChangeSetIdentifier; import org.simantics.db.Database; import org.simantics.db.Database.Session.ChangeSetData; import org.simantics.db.Database.Session.ChangeSetIds; import org.simantics.db.Database.Session.ClusterChanges; import org.simantics.db.Database.Session.ClusterIds; import org.simantics.db.Database.Session.Refresh; import org.simantics.db.Operation; import org.simantics.db.SessionReference; import org.simantics.db.common.UndoContextEx; import org.simantics.db.common.utils.Logger; import org.simantics.db.exception.DatabaseException; import org.simantics.db.exception.InternalException; import org.simantics.db.impl.ClusterI; import org.simantics.db.impl.ResourceImpl; import org.simantics.db.service.ClusterUID; import org.simantics.db.service.ExternalOperation; import org.simantics.db.service.ManagementSupport.ChangeSetListener; import org.simantics.scl.runtime.function.Function1; import org.simantics.scl.runtime.function.FunctionImpl1; import org.simantics.utils.threads.ThreadUtils; import gnu.trove.iterator.TLongObjectIterator; import gnu.trove.map.hash.THashMap; import gnu.trove.map.hash.TLongObjectHashMap; final class ChangeSetIdentifierImpl implements ChangeSetIdentifier { private final long id; private final GraphSession graphSession; private Map metadata; ChangeSetIdentifierImpl(long id, GraphSession graphSession) { this.id = id; this.graphSession = graphSession; this.metadata = null; } public final long getId() { return id; } public final void setMetadata(Map metadata) { this.metadata = metadata; } public final byte[] getContext() throws DatabaseException { if (null == graphSession) return new byte[0]; return graphSession.getChangeSetContext(id); } static Serializer METADATA_SERIALIZER = Bindings.getSerializerUnchecked( new TreeMapBinding(Bindings.STRING, Bindings.BYTE_ARRAY)); @SuppressWarnings("unchecked") public Map getMetadata() { try { if (metadata == null) { byte[] data = getContext(); if (null != data && data.length > 0) { metadata = (TreeMap) METADATA_SERIALIZER.deserialize(data); } } } catch (DatabaseException e) { Logger.defaultLogError(e); e.printStackTrace(); } catch (IOException e) { Logger.defaultLogError(e); e.printStackTrace(); } return metadata; } static long[] changeSetIds2ints(Collection changeSetIds) { if (null == changeSetIds) return new long[0]; long[] t = new long[changeSetIds.size()]; int i = -1; for (ChangeSetIdentifier id : changeSetIds) t[++i] = ((ChangeSetIdentifierImpl) id).getId(); return t; } static long[] operations2ints(Collection ops) { if (null == ops) return new long[0]; long[] t = new long[ops.size()]; int i = -1; for (Operation o : ops) t[++i] = o.getCSId(); return t; } static Collection longs2changeSetIds(GraphSession gs, long firstChangeSetIds, int count) { ArrayList t = new ArrayList(); t.ensureCapacity(count); for (int i=0; i longs2changeSetIds(GraphSession gs, long min, long max) { ArrayList t = new ArrayList(); if (max clusterMap = new TLongObjectHashMap(); protected THashMap builtinMap = null; private long firstChangeSetId = 0; protected SynchronizeContextI synchronizeContext; final UndoContextEx undoContext = new UndoContextEx("GraphSession"); final CopyOnWriteArrayList changeSetListeners = new CopyOnWriteArrayList(); private long lastChangeSetId = 0; protected MetadataCache metadataCache = new MetadataCache(); public GraphSession(SessionImplSocket sessionImpl, SessionReference sessionReference, Database.Session dbSession) { this.dbSession = dbSession; if (null == dbSession) throw new RuntimeException("Failed to initialize GraphSession. Database.Session can not be null."); // if (!(sessionReference instanceof ProCoreSessionReference)) // throw new RuntimeException("Failed to initialize GraphSession. SessionReference must be instance of ProCoreSessionReference."); this.sessionReference = sessionReference; this.session = sessionImpl; } void addChangeSetListener(ChangeSetListener csl) { changeSetListeners.add(csl); } void removeChangeSetListener(ChangeSetListener csl) { changeSetListeners.remove(csl); } SessionReference getSessionReference() { return sessionReference; } public void open() throws InternalException { dbSession.open(); } public void close() throws InternalException { dbSession.close(); } public long getSessionId() { return sessionReference.getSessionId(); } // // public String getHost() { // return sessionReference.serverReference.socketAddress.getHostName(); // } // // public int getPort() { // return sessionReference.serverReference.socketAddress.getPort(); // } long getFirstChangeSetId() { return firstChangeSetId; } String getClusterFileName(long clusterId) { String fileName = "cluster" + clusterId + ".dat"; return fileName; } int computeClusterMemoryUse() { int size = 0; TLongObjectIterator i = clusterMap.iterator(); while (i.hasNext()) { i.advance(); ClusterI c = i.value(); try { size += c.getUsedSpace(); } catch (DatabaseException e) { Logger.defaultLogInfo("GetUsedSpace faile for cluster " + c); } } return size; } void printClusterSize() { long size = 0; long amount = 0; TLongObjectIterator i = clusterMap.iterator(); while (i.hasNext()) { i.advance(); ClusterI c = i.value(); if (c.isLoaded()) { try { size += c.getUsedSpace(); } catch (DatabaseException e) { Logger.defaultLogInfo("GetUsedSpace faile for cluster " + c); } amount++; } } if (amount > 50) loadedClusters.clear(); System.out.println("" + size / (1024 * 1024) + "M in " + amount + " clusters."); } private HashSet loadedClusters = new HashSet(); ClusterI freeClusterFromCacheAndFreeClusterMemory(long clusterId) { ClusterI c = clusterMap.remove(clusterId); if (null != c) c.releaseMemory(); return c; } private void debugPrint(UpdateClusterFunction f) { for (int i = 0; i < f.operation.length; ++i) { System.out.println("op=" + f.operation[i]); } } long total = 0; public boolean updateCluster(UpdateClusterFunction f) { session.clusterStream.markDirty(); assert (null != f); try { if (DEBUG) { System.out.println("update cluster"); if (VERBOSE) debugPrint(f); } // long start = System.nanoTime(); dbSession.updateCluster(f.operation); // long duration = System.nanoTime() - start; // total += duration; // System.err.println("graphClient.call = " + 1e-9*duration + " " + 1e-9*total); // System.out.println("GraphSession.updateCluster done."); return true; // ok } catch (Throwable e) { e.printStackTrace(); return false; } } public byte[] getChangeSetContext(long id) throws DatabaseException { byte[] data = metadataCache.get(id); if (null != data) return data; if (id < getServerInformation().firstChangeSetId) { Logger.defaultLogInfo("Asking purged change set metadata. uid=" + id); return new byte[0]; } return dbSession.getChangeSetMetadata(id); } public StatementImpl getStatement(ResourceImpl s, ResourceImpl p, ResourceImpl o) { return new StatementImplOld(s, p, o); } public long newClusterId() throws DatabaseException { long id = dbSession.reserveIds(1); if (id <= ClusterUID.Builtin.second) return newClusterId(); return id; } long load = 0; long inflate = 0; // public ClusterI getClusterImpl(ClusterUID clusterUID, int clusterKey) // throws DatabaseException { // assert(ClusterUID.isLegal(clusterUID)); // // if(Development.DEVELOPMENT) { // if(Development.isTrue(DevelopmentKeys.READGRAPH_COUNT)) { // ReadGraphImpl.counters.adjustOrPutValue("load " + clusterUID + " " + clusterKey, 1, 1); // } // } // //// ITask task = ThreadLogger.getInstance().begin("Load cluster " + clusterId); // if (DEBUG) // System.out.println("DEBUG: loading cluster=" + clusterUID); //// System.out.println("getClusterImpl " + clusterId); // long start = System.nanoTime(); // Cluster t = dbSession.getCluster(clusterUID.asBytes()); // long duration = System.nanoTime() - start; // load += duration; // //System.out.println("loaded in " + 1e-9*duration + " total " + 1e-9*load); //// if (Constants.NullClusterId == t.clusterId) { //// throw new DatabaseException("Cluster " + t.clusterId + " not in server."); //// } // // ByteBuffer deflated = t.getDeflated(); // start = System.nanoTime(); // Object[] arrays = clusterDecompressor.inflateCluster(t.getInflateSize(), deflated); // duration = System.nanoTime() - start; // inflate += duration; // //System.out.println("inflated in " + 1e-9*duration + " total " + 1e-9*inflate); // long[] longs = (long[]) arrays[0]; // int[] ints = (int[]) arrays[1]; // byte[] bytes = (byte[]) arrays[2]; // //// System.out.println("Got cluster " + clusterId + " from core."); // ClusterImpl cluster = ClusterImpl.make(longs, ints, bytes, session.clusterTranslator, clusterKey); //// System.out.println("cluster[ " + clusterId + "].size = " + cluster.getUsedSpace(-1)); //// task.finish(); // return cluster; // } ClusterI getCluster(long clusterId) { return clusterMap.get(clusterId); } ClusterI getClusterOrThrow(long clusterId) throws IllegalArgumentException { ClusterI c = this.getCluster(clusterId); if (c == null) throw new IllegalArgumentException("Cluster " + clusterId + " does not exist."); return c; } public String execute(String command) throws DatabaseException { return dbSession.execute(command); } public Collection getChangeSets() throws DatabaseException { ChangeSetIds t = dbSession.getChangeSetIds(); return ChangeSetIdentifierImpl.longs2changeSetIds(this, t.getFirstChangeSetId(), t.getCount()); } public long getLastChangeSetId() { return lastChangeSetId; } public Collection getChangeSets(long from, long to, long upperLimit) throws DatabaseException { if (from > to) throw new DatabaseException("Illegal argument: from=" + from + " > to=" + to + "."); long min = Math.max(from, 1); long max = Math.min(to, upperLimit); return ChangeSetIdentifierImpl.longs2changeSetIds(this, min, max); } public boolean getChangeSets(long minCS, long maxCS, SynchronizeContextI context) throws DatabaseException { try { synchronizeContext = context; ChangeSetData t = dbSession.getChangeSetData(minCS, maxCS, context); return !t.isOk(); } catch (Throwable e) { if (DEBUG) e.printStackTrace(); if (e instanceof DatabaseException) throw (DatabaseException) e; else throw new DatabaseException("GetChangeSetData call to server failed."); } finally { synchronizeContext = null; } } public static void forExternals(Collection operations, Function1 fn) { for(Operation o : operations) { for(ExternalOperation e : o.getExternalOperations()) { if(!e.isDisposed()) fn.apply(e); } } } public boolean undo(Collection changeSetIds, SynchronizeContextI context) throws DatabaseException { try { forExternals(changeSetIds, new FunctionImpl1() { @Override public Boolean apply(ExternalOperation op) { op.undo(); return true; } }); long[] cids = ChangeSetIdentifierImpl.operations2ints(changeSetIds); synchronizeContext = context; if (DEBUG) for (long id : cids) System.out.println("DEBUG: Undo cs=" + id); return dbSession.undo(cids, context); } catch (Throwable e) { if (DEBUG) e.printStackTrace(); if (e instanceof DatabaseException) throw (DatabaseException) e; else throw new DatabaseException("Undo call to server failed.", e); } finally { synchronizeContext = null; } } public ClusterUID[] getRefresh2(long csid) throws DatabaseException { Refresh t = dbSession.getRefresh(csid); if (DEBUG) System.err.println("getRefresh2 old cs=" + csid + " head cs=" + t.getHeadChangeSetId() + " chanaged clusters=" + t.getFirst().length); ClusterUID[] clusters = new ClusterUID[t.getFirst().length]; for (int i=0; i Integer.MAX_VALUE) throw new DatabaseException("Failed to get value for resource index=" + resourceIndex + " cluster=" + clusterUID + " off=" + offset + " len=" + length + ". Value size=" + s.valueSize + " (2)."); length = (int)s.valueSize; } long rSize = s.valueSize - offset; if (rSize < length) throw new DatabaseException("Failed to get value for resource index=" + resourceIndex + " cluster=" + clusterUID + " off=" + offset + " len=" + length + ". Value size=" + s.valueSize + " (3)."); else if (length <= IMAX) return new ByteArrayInputStream(s.bytes); final int finalLength = length; return new InputStream() { int left = finalLength; long valueOffset = 0; int offset = 0; ResourceSegment _s = s; @Override public int read() throws IOException { if(left <= 0) return -1; if(offset == _s.bytes.length) { short slen = (short)Math.min(left, IMAX); valueOffset += _s.bytes.length; try { _s = getResourceSegment(resourceIndex, clusterUID, valueOffset, slen); } catch (DatabaseException e) { throw new IOException(e); } offset = 0; } left--; int result = _s.bytes[offset++]; if(result < 0) result += 256; return result; } }; } public byte[] getResourceValue(int resourceIndex, ClusterUID clusterUID, long offset, int length) throws DatabaseException { if (DEBUG) System.out.println("DEBUG: getResourceFile ri=" + resourceIndex + " cid=" + clusterUID + " off=" + offset + " len=" + length); final int IMAX = 0xFFFF; short slen = (short)Math.min(length != 0 ? length : IMAX, IMAX); ResourceSegment s = getResourceSegment(resourceIndex, clusterUID, offset, slen); final long VALUE_SIZE = s.valueSize; if (s.valueSize < 0) throw new DatabaseException("Failed to get value for resource index=" + resourceIndex + " cluster=" + clusterUID + " off=" + offset + " len=" + length + " (1)."); int ilen = (int)slen & 0xFFFF; assert(s.bytes.length <= ilen); if (0 == length) { if (s.valueSize > Integer.MAX_VALUE) throw new DatabaseException("Failed to get value for resource index=" + resourceIndex + " cluster=" + clusterUID + " off=" + offset + " len=" + length + ". Value size=" + s.valueSize + " (2)."); length = (int)s.valueSize; } long rSize = s.valueSize - offset; if (rSize < length) throw new DatabaseException("Failed to get value for resource index=" + resourceIndex + " cluster=" + clusterUID + " off=" + offset + " len=" + length + ". Value size=" + s.valueSize + " (3)."); else if (length <= IMAX) return s.bytes; byte[] bytes = new byte[length]; int left = (int)length - s.bytes.length; int cur = s.bytes.length; offset += s.bytes.length; System.arraycopy(s.bytes, 0, bytes, 0, cur); while (left > 0) { slen = (short)Math.min(left, IMAX); s = getResourceSegment(resourceIndex, clusterUID, offset, slen); ilen = (int)slen & 0xFFFF; if (s.valueSize != VALUE_SIZE || s.bytes.length != ilen) throw new DatabaseException("Failed to get value for resource index=" + resourceIndex + " cluster=" + clusterUID + " off=" + offset + " len=" + length + ". Value size=" + s.valueSize + " blen=" + s.bytes.length + " (4)."); System.arraycopy(s.bytes, 0, bytes, cur, s.bytes.length); left -= s.bytes.length; cur += s.bytes.length; offset += s.bytes.length; } return bytes; } public long getResourceValueSize(int resourceIndex, ClusterUID clusterUID) throws DatabaseException { if (DEBUG) System.out.println("DEBUG: getResourceFile ri=" + resourceIndex + " cid=" + clusterUID); final short SMAX = (short)0xFFFF; ResourceSegment s = getResourceSegment(resourceIndex, clusterUID, 0, SMAX); if (s.valueSize < 0) throw new DatabaseException("Failed to get value for resource index=" + resourceIndex + " cluster=" + clusterUID + " size=" + s.valueSize); return s.valueSize; } int wait4RequestsLess(int limit) throws DatabaseException { dbSession.execute(""); return 0; } interface Listener { void onChangeSetId(int thread, long csid, boolean refresh); } void setListener(Listener listener) { this.listener = listener; } protected void updateLastChangeSetId(int thread, final long csid, boolean refresh) { if (null != listener) listener.onChangeSetId(thread, csid, refresh); if (csid > lastChangeSetId) { lastChangeSetId = csid; final Iterator it = changeSetListeners.iterator(); ThreadUtils.getBlockingWorkExecutor().execute(new Runnable() { @Override public void run() { while (it.hasNext()) { ChangeSetListener l = it.next(); try { l.onChanged(csid); } catch (Throwable t) { Logger.defaultLogError(t); } } } }); } } protected abstract ServerInformationImpl getServerInformation() throws DatabaseException; public abstract void acceptCommit(long transactionId, long csid, byte[] metadata) throws DatabaseException; public abstract void cancelCommit(long transactionId, long csid, byte[] metadata, SynchronizeContextI context) throws DatabaseException; public abstract void endTransaction(long transactionId, boolean write) throws DatabaseException; public abstract long askWriteTransaction(int thread, long transactionId) throws DatabaseException; public abstract long askReadTransaction(int thread) throws DatabaseException; public abstract void stop() throws DatabaseException; public abstract long reserveIds(int count) throws DatabaseException; static class MetadataCache { private final boolean DEBUG = false; private final int SIZE = 10; private int lastInd; // index of last inserted element private long lastId; // id of last inserted element private int count; // number of inserted elements Vector buffer; MetadataCache() { init(); } public int clear() { int ret = count; init(); return ret; } private void init() { if (DEBUG) System.out.println("DEBUG: MetadataCache init"); lastInd = -1; lastId = 0; count = 0; buffer = new Vector(); buffer.setSize(SIZE); } private boolean incLastInd() { ++lastInd; if (lastInd >= SIZE) { lastInd = 0; return true; } else return false; } synchronized void addNext(long id, byte[] data) throws DatabaseException { if (DEBUG) System.out.println("DEBUG: MetadataCache add id=" + id); if (lastId != 0 && lastId != id - 1) init(); // Only continuous sequence supported. incLastInd(); buffer.set(lastInd, data); lastId = id; if (count < SIZE) ++count; } synchronized byte[] get(long id) { if (DEBUG) System.out.println("DEBUG: MetadataCache get id=" + id); if (id > lastId || id <= lastId - count) return null; int ind = lastInd - (int)(lastId - id); if (ind < 0) ind = SIZE + ind; byte[] found = buffer.get(ind); if (DEBUG) if (null != found) System.out.println("DEBUG: MetadataCache found id=" + id); return found; } } public ClusterUID[] listClusters() throws InternalException { ClusterIds t = dbSession.getClusterIds(); long[] first = t.getFirst(); long[] second = t.getSecond(); int N1 = (null == first) ? 0 : first.length; N1 = Math.min(N1, t.getStatus()); int N2 = (null == second) ? 0 : second.length; N2 = Math.min(N1, N2); ClusterUID[] clusters = new ClusterUID[N2]; for (int i=0; i weakResource = null; BuiltinData(long id, long cluster) { this.id = id; this.cluster = cluster; } } class UpdateClusterFunction { public byte[] operation = null; public UpdateClusterFunction(byte[] operation) { this.operation = operation; } }