X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;f=bundles%2Forg.simantics.databoard%2Fsrc%2Forg%2Fsimantics%2Fdataboard%2Faccessor%2Fbinary%2FBinaryVariableWidthStreamArray.java;h=3614efdba6fba4ee70e38c8a3a687df52c89dd78;hb=refs%2Fchanges%2F15%2F3015%2F1;hp=b070008f00823cbfde279ba2908b211c724e6430;hpb=969bd23cab98a79ca9101af33334000879fb60c5;p=simantics%2Fplatform.git diff --git a/bundles/org.simantics.databoard/src/org/simantics/databoard/accessor/binary/BinaryVariableWidthStreamArray.java b/bundles/org.simantics.databoard/src/org/simantics/databoard/accessor/binary/BinaryVariableWidthStreamArray.java index b070008f0..3614efdba 100644 --- a/bundles/org.simantics.databoard/src/org/simantics/databoard/accessor/binary/BinaryVariableWidthStreamArray.java +++ b/bundles/org.simantics.databoard/src/org/simantics/databoard/accessor/binary/BinaryVariableWidthStreamArray.java @@ -1,1066 +1,1066 @@ -/******************************************************************************* - * Copyright (c) 2010 Association for Decentralized Information Management in - * Industry THTH ry. - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * Contributors: - * VTT Technical Research Centre of Finland - initial API and implementation - *******************************************************************************/ -package org.simantics.databoard.accessor.binary; - -import java.io.IOException; -import java.lang.ref.WeakReference; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; -import java.util.SortedMap; -import java.util.TreeMap; - -import org.simantics.databoard.Bindings; -import org.simantics.databoard.accessor.Accessor; -import org.simantics.databoard.accessor.ArrayAccessor; -import org.simantics.databoard.accessor.CloseableAccessor; -import org.simantics.databoard.accessor.StreamAccessor; -import org.simantics.databoard.accessor.error.AccessorConstructionException; -import org.simantics.databoard.accessor.error.AccessorException; -import org.simantics.databoard.accessor.error.ReferenceException; -import org.simantics.databoard.accessor.event.ArrayElementAdded; -import org.simantics.databoard.accessor.event.ArrayElementRemoved; -import org.simantics.databoard.accessor.event.Event; -import org.simantics.databoard.accessor.event.ValueAssigned; -import org.simantics.databoard.accessor.file.FileArrayAccessor; -import org.simantics.databoard.accessor.impl.AccessorParams; -import org.simantics.databoard.accessor.impl.ListenerEntry; -import org.simantics.databoard.accessor.interestset.ArrayInterestSet; -import org.simantics.databoard.accessor.interestset.InterestSet; -import org.simantics.databoard.accessor.reference.ChildReference; -import org.simantics.databoard.accessor.reference.IndexReference; -import org.simantics.databoard.accessor.reference.LabelReference; -import org.simantics.databoard.adapter.AdaptException; -import org.simantics.databoard.binding.ArrayBinding; -import org.simantics.databoard.binding.Binding; -import org.simantics.databoard.binding.error.BindingException; -import org.simantics.databoard.binding.mutable.MutableVariant; -import org.simantics.databoard.serialization.Serializer; -import org.simantics.databoard.serialization.SerializerConstructionException; -import org.simantics.databoard.type.ArrayType; -import org.simantics.databoard.type.Datatype; -import org.simantics.databoard.type.LongType; -import org.simantics.databoard.util.binary.Blob; -import org.simantics.databoard.util.binary.RandomAccessBinary.ByteSide; - -/** - * Binary Array is accessor to a byte backed array of elements. - *

- * - * Note, To increase the random access performance of the record, create sub-accessors of - * its fields. - * - * @author Toni Kalajainen - */ -public class BinaryVariableWidthStreamArray extends BinaryObject implements ArrayAccessor, FileArrayAccessor, ArrayAccessor.CloseableArrayAccessor, StreamAccessor { - - /** Accessors to children */ - TreeMap> children = new TreeMap>(); - - Binding cb; - Serializer cs; - Integer constantSize; - ArrayAccessor index; - - public BinaryVariableWidthStreamArray(BinaryObject parent, Blob blob, Datatype type, AccessorParams params, ArrayAccessor index) - throws AccessorConstructionException - { - super(parent, blob, type, params); - ArrayType at = (ArrayType) type; - cb = params.bindingScheme.getBindingUnchecked(at.componentType); - cs = params.serializerScheme.getSerializerUnchecked( cb ); - constantSize = cs.getConstantSize(); - if (index==null || index.type().componentType instanceof LongType == false) { - throw new AccessorConstructionException("Index must be Long[]"); - } - this.index = index; - } - - public ArrayType type() { - return (ArrayType) type; - } - - /** - * Get existing sub accessor - * @param index - * @return sub-accessor or null - */ - BinaryObject getExistingAccessor(int index) - { - java.lang.ref.Reference ref = children.get(index); - if (ref==null) return null; - BinaryObject res = (BinaryObject) ref.get(); -// if (res==null) children.remove(index); - return res; - } - - /** - * Get start position of a field - * - * @param fieldIndex - * @return - * @throws AccessorException - */ - long getStartPosition(int fieldIndex) throws AccessorException { - int c = this.index.size(); - if ( fieldIndex==c ) - try { - return b.length(); - } catch (IOException e) { - throw new AccessorException( e ); - } - if ( fieldIndex>c || fieldIndex<0 ) throw new AccessorException("Index out of bounds"); - return (Long) index.get(fieldIndex, Bindings.LONG); - } - - long getLength(int index, long pos) throws AccessorException { - int c = this.index.size(); - if ( index>=c || index<0 ) return 0; - if ( index==c-1 ) - { - try { - return b.length() - (Long) this.index.get(c-1, Bindings.LONG); - } catch (IOException e) { - throw new AccessorException( e ); - } - } else { - return (Long) this.index.get(index+1, Bindings.LONG) - (Long) this.index.get(index, Bindings.LONG); - } - } - - @Override - public void setNoflush(int index, Binding rcb, Object rcv) - throws AccessorException { - assert b.isOpen(); - - writeLock(); - try { - - int count = size(); - if ( index==count ) { - addNoflush(count, rcb, rcv); - return; - } - - // Write - Serializer rcs = params.serializerScheme.getSerializer( rcb ); - long pos = getStartPosition(index); - long oldSize = getLength(index, pos); - long newSize = rcs.getSize(rcv, null); - b.position(pos); - long diff = newSize - oldSize; - if (diff>0) { - b.insertBytes( newSize - oldSize, ByteSide.Right ); - } else if (diff<0) { - b.removeBytes( oldSize - newSize, ByteSide.Right ); - } - b.position(pos); - rcs.serialize(b, null, rcv); - - // Update index of the higher indices - if ( diff!= 0 ) { - for (int i=index+1; i> pm = children.subMap(index+1, true, lastKey, true); - for (Entry> e : pm.entrySet()) { - BinaryObject sa_ = e.getValue().get(); - if (sa_ == null) continue; - sa_.b.setPositionInSource( sa_.b.getStartPositionInSourceBinary() + diff, sa_.b.length()); - } - } - }*/ - - // Notify - ListenerEntry le = listeners; - while (le!=null) { - ArrayInterestSet is = le.getInterestSet(); - if (is.inNotificationsOf(index)) { - MutableVariant newValue = null; - if (is.inValues()) newValue = new MutableVariant(rcb, rcb.isImmutable() ? rcv : rcb.clone(rcv)); - - Event e = new ValueAssigned(new IndexReference(index), newValue); - emitEvent(le, e); - } - le = le.next; - } - - } catch (IOException e) { - throw new AccessorException(e); - } catch (AdaptException e) { - throw new AccessorException(e); - } catch (SerializerConstructionException e) { - throw new AccessorException(e); - } finally { - writeUnlock(); - } - - } - - /** - * Set all values - * - * @param arrayBinding - * @param newArray - */ - @Override - public void setValueNoflush(Binding arrayBinding, Object newArray) - throws AccessorException { - assert b.isOpen(); - writeLock(); - try { - // Write - ArrayBinding rb = ((ArrayBinding)arrayBinding); - Binding rcb = rb.getComponentBinding(); - Serializer rcs = params.serializerScheme.getSerializer( rcb ); - int oldCount = index.size(); - int newCount = rb.size(newArray); - b.setLength( params.serializerScheme.getSerializer( rb ).getSize(newArray) ); - index.setSize(newCount); - - // Serialize - for (int index=0; index=newCount; index--) { - BinaryObject sa = getExistingAccessor(index); - if (sa!=null) { - sa.invalidatedNotification(); - children.remove(index); - sa = null; - } - - // Notify changes - ListenerEntry le = listeners; - while (le!=null) { - ArrayInterestSet is = le.getInterestSet(); - if (is.inNotificationsOf(index)) { - Event e = new ArrayElementRemoved(index); - emitEvent(le, e); - } - le = le.next; - } - } - - // Notify new assignment - if (listeners!=null) { - for (int index=0; indexoldCount) throw new AccessorException("Index out of range"); - - long pos = getStartPosition(index); - int size = rcs.getSize(rcv); - b.position(pos); - b.insertBytes(size, ByteSide.Left); - rcs.serialize(b, null, rcv); - - this.index.add(index, Bindings.LONG, pos); - - for (int i=index+1; i= index) { - java.lang.ref.Reference value = children.remove(key); - if (value.get()!=null) children.put(key+1, value); - key = children.lowerKey(key); - } - } - - // Notify Listeners - ListenerEntry le = listeners; - while (le!=null) { - ArrayInterestSet is = le.getInterestSet(); - if (is.inNotifications()) { - MutableVariant newValue = null; - if (is.inValues()) newValue = new MutableVariant(rcb, rcb.isImmutable() ? rcv : rcb.clone(rcv)); - ArrayElementAdded e = new ArrayElementAdded(index, newValue); - emitEvent(le, e); - } - - // Update indices of interest sets - if (is.componentInterests!=null) { - Map oldCis = is.componentInterests; - boolean needUpdates = false; - for (Integer i : oldCis.keySet()) { - needUpdates |= i>=index; - if (needUpdates) break; - } - - if (needUpdates) { - Map newCis = new HashMap(oldCis.size()); - for (Integer i : oldCis.keySet()) - { - Integer oldKey = i; - Integer newKey = i>=index ? i+1 : i; - InterestSet oldValue = oldCis.get(oldKey); - newCis.put(newKey, oldValue); - } - is.componentInterests = newCis; - } - } - - // Add component interest listener - /* - boolean hadSa = getExistingAccessor(index) != null; - if (!hadSa) { - // Add component interest listener - InterestSet cis = is.getComponentInterest(); - if (cis != null) { - Accessor sa = getAccessor(index); - } - cis = is.getComponentInterest(index); - if (cis != null) { - Accessor sa = getAccessor(index); - } - } */ - - le = le.next; - } - - } catch (IOException e) { - throw new AccessorException(e); - } catch (AdaptException e) { - throw new AccessorException(e); - } catch (SerializerConstructionException e) { - throw new AccessorException(e); - } finally { - writeUnlock(); - } - } - - @Override - public void addNoflush(Binding binding, Object value) - throws AccessorException { - addNoflush(size(), binding, value); - } - - @Override - public void addAllNoflush(Binding binding, Object[] values) - throws AccessorException { - addAllNoflush(size(), binding, values); - } - - @Override - public void addAllNoflush(int index, Binding rcb, Object[] rcvs) - throws AccessorException { - if (index<0||index>size()) throw new AccessorException("Index out of bounds"); - assert b.isOpen(); - writeLock(); - try { - Serializer rcs = params.serializerScheme.getSerializer( rcb ); - // Write - b.position(0L); - int repeatCount = rcvs.length; - int oldCount = b.readInt(); - int newCount = oldCount + rcvs.length; - if (index>oldCount) throw new AccessorException("Index out of range"); - boolean lastEntry = index == oldCount; - - long size = 0; - for (int i=0; i= index) { - java.lang.ref.Reference value = children.remove(key); - if (value.get()!=null) children.put(key+rcvs.length, value); - key = children.lowerKey(key); - } - } - - // Notify Listeners - ListenerEntry le = listeners; - while (le!=null) { - ArrayInterestSet is = le.getInterestSet(); - if (is.inNotifications()) { - for (int i=0; i oldCis = is.componentInterests; - boolean needUpdates = false; - for (Integer i : oldCis.keySet()) { - needUpdates |= i>=index; - if (needUpdates) break; - } - - if (needUpdates) { - Map newCis = new HashMap(oldCis.size()); - for (Integer i : oldCis.keySet()) - { - Integer oldKey = i; - Integer newKey = i>=index ? i+rcvs.length : i; - InterestSet oldValue = oldCis.get(oldKey); - newCis.put(newKey, oldValue); - } - is.componentInterests = newCis; - } - - // Add component interest listener - /* - for (int i = index; isize()) throw new AccessorException("Index out of bounds"); - assert b.isOpen(); - writeLock(); - try { - Serializer rcs = params.serializerScheme.getSerializer( rcb ); - // Write - b.position(0L); - int oldCount = this.index.size(); - int newCount = oldCount + repeatCount; - if (index>oldCount) throw new AccessorException("Index out of range"); - boolean lastEntry = index == oldCount; - - long componentSize = rcs.getSize(obj); - long size = componentSize * repeatCount; - long pos = getStartPosition(index); - b.position(pos); - b.insertBytes(size, ByteSide.Right); - long p = pos; - for (int i=0; i= index) { - java.lang.ref.Reference value = children.remove(key); - if (value.get()!=null) children.put(key+repeatCount, value); - key = children.lowerKey(key); - } - } - - // Notify Listeners - ListenerEntry le = listeners; - while (le!=null) { - ArrayInterestSet is = le.getInterestSet(); - if (is.inNotifications()) { - for (int i=0; i oldCis = is.componentInterests; - boolean needUpdates = false; - for (Integer i : oldCis.keySet()) { - needUpdates |= i>=index; - if (needUpdates) break; - } - - if (needUpdates) { - Map newCis = new HashMap(oldCis.size()); - for (Integer i : oldCis.keySet()) - { - Integer oldKey = i; - Integer newKey = i>=index ? i+repeatCount : i; - InterestSet oldValue = oldCis.get(oldKey); - newCis.put(newKey, oldValue); - } - is.componentInterests = newCis; - } - - } - - le = le.next; - } - - } catch (IOException e) { - throw new AccessorException(e); - } catch (SerializerConstructionException e) { - throw new AccessorException(e); - } finally { - writeUnlock(); - } - } - - @Override - public void removeNoflush(int index, int count) throws AccessorException { - assert b.isOpen(); - writeLock(); - try { - // Write - boolean lastEntry = index == count; - int oldCount = this.index.size(); - int newCount = oldCount - count; - if (index<0||index+count>oldCount) throw new AccessorException("Index out of bounds"); - long pos = getStartPosition(index); - long lastPos = getStartPosition(index+count-1); - long lastLen = getLength(index, lastPos); - long end = lastPos + lastLen; - long len = end - pos; - b.position(pos); - b.removeBytes(len, ByteSide.Right); - this.index.remove(index, count); - for (int i=index; i> sm = children.subMap(index, true, index+count, false); - for (Entry> e : sm.entrySet()) { - BinaryObject bo = e.getValue().get(); - if (bo==null) continue; - bo.invalidatedNotification(); - } - sm.clear(); - - // Update the keys of consecutive children - if (!lastEntry && !children.isEmpty()) { - Integer lastKey = children.lastKey(); - Integer key = children.higherKey(index); - while (key != null && key <= lastKey) { - java.lang.ref.Reference value = children.remove(key); - if (value.get()!=null) children.put(key-count, value); - key = children.higherKey(key); - } - } - - // Notify Listeners - ListenerEntry le = listeners; - while (le!=null) { - ArrayInterestSet is = le.getInterestSet(); - if (is.inNotifications()) { - ArrayElementRemoved e = new ArrayElementRemoved(index); - emitEvent(le, e); - } - - // Update indices of interest sets - if (is.componentInterests!=null) { - Map oldCis = is.componentInterests; - boolean needUpdates = false; - for (Integer i : oldCis.keySet()) { - needUpdates |= i>=index; - if (needUpdates) break; - } - - if (needUpdates) { - Map newCis = new HashMap(oldCis.size()); - for (Integer i : oldCis.keySet()) - { - Integer oldKey = i; - Integer newKey = i>=index ? i-1 : i; - InterestSet oldValue = oldCis.get(oldKey); - newCis.put(newKey, oldValue); - } - is.componentInterests = newCis; - } - } - le = le.next; - } - - - } catch (IOException e) { - throw new AccessorException( e ); - } finally { - writeUnlock(); - } - } - - @Override - public Object get(int index, Binding valueBinding) throws AccessorException { - assert b.isOpen(); - readLock(); - try { - long pos = getStartPosition(index); - b.position(pos); - Serializer s = params.serializerScheme.getSerializer(valueBinding); - return s.deserialize(b); - } catch (IOException e) { - throw new AccessorException(e); - } catch (SerializerConstructionException e) { - throw new AccessorException(e); - } finally { - readUnlock(); - } - } - - @Override - public void get(int index, Binding valueBinding, Object dst) throws AccessorException { - assert b.isOpen(); - readLock(); - try { - long pos = getStartPosition(index); - b.position(pos); - Serializer s = params.serializerScheme.getSerializer(valueBinding); - s.deserializeTo(b, dst); - } catch (IOException e) { - throw new AccessorException(e); - } catch (SerializerConstructionException e) { - throw new AccessorException(e); - } finally { - readUnlock(); - } - } - - - @SuppressWarnings("unchecked") - @Override - public T getAccessor(int index) - throws AccessorConstructionException { - assert b.isOpen(); - readLock(); - try { - b.position(0L); - int count = b.readInt(); - if (index<0 || index>=count) throw new ReferenceException("Element index ("+index+") out of bounds ("+count+")"); - - // Get existing or create new - BinaryObject sa = getExistingAccessor(index); - if (sa==null) { - long pos = getStartPosition(index); - long len = getLength(index, pos); - - // Instantiate correct sub accessor. - sa = createSubAccessor(cb.type(), pos, len, params); - children.put(index, new WeakReference(sa) ); - - // Add component interest sets - ListenerEntry le = listeners; - while (le!=null) { - ArrayInterestSet is = le.getInterestSet(); - - // Generic element interest - InterestSet gis = is.getComponentInterest(); - if (gis != null) { - try { - ChildReference childPath = ChildReference.concatenate(le.path, new IndexReference(index) ); - sa.addListener(le.listener, gis, childPath, le.executor); - } catch (AccessorException e) { - throw new AccessorConstructionException(e); - } - } - - // Specific element interest - InterestSet cis = is.getComponentInterest(index); - if (cis != null) { - try { - ChildReference childPath = ChildReference.concatenate(le.path, new IndexReference(index) ); - sa.addListener(le.listener, cis, childPath,le.executor); - } catch (AccessorException e) { - throw new AccessorConstructionException(e); - } - } - - // Next listener - le = le.next; - } - - } - - return (T) sa; - } catch (IOException e) { - throw new AccessorConstructionException(e); - } catch (AccessorException e) { - throw new AccessorConstructionException(e); - } finally { - readUnlock(); - } - } - - @SuppressWarnings("unchecked") - @Override - public T getComponent(ChildReference reference) - throws AccessorConstructionException { - if (reference==null) return (T) this; - if (reference instanceof LabelReference) { - LabelReference lr = (LabelReference) reference; - try { - Integer index = new Integer( lr.label ); - Accessor result = getAccessor(index); - if (reference.getChildReference() != null) - result = result.getComponent(reference.getChildReference()); - return (T) result; - } catch ( NumberFormatException nfe ) { - throw new ReferenceException(nfe); - } - } else if (reference instanceof IndexReference) { - IndexReference ref = (IndexReference) reference; - int index = ref.getIndex(); - Accessor result = getAccessor(index); - if (reference.getChildReference() != null) - result = result.getComponent(reference.getChildReference()); - return (T) result; - } throw new ReferenceException(reference.getClass().getName()+" is not a reference of an array"); - } - - @Override - public void getAll(Binding valueBinding, Object[] array) - throws AccessorException { - assert b.isOpen(); - readLock(); - try { - b.position(0L); - int size = b.readInt(); - if (size > array.length) throw new AccessorException("Argument array too short"); - Serializer s = params.serializerScheme.getSerializer(valueBinding); - for (int i=0; i values) - throws AccessorException { - assert b.isOpen(); - readLock(); - try { - b.position(0L); - int size = b.readInt(); - Serializer s = params.serializerScheme.getSerializer(valueBinding); - for (int i=0; ioldSize) { - Object dummy = cb.createDefault(); - int count = newSize-oldSize; - addRepeatNoflush(oldSize, cb, dummy, count); - } - - } catch (BindingException e) { - throw new AccessorException( e ); - } finally { - writeUnlock(); - } - } - - @Override - public void setSize(int newSize) throws AccessorException { - assert b.isOpen(); - writeLock(); - try { - setSizeNoflush(newSize); - b.flush(); - } catch (IOException e) { - throw new AccessorException( e ); - } finally { - writeUnlock(); - } - } - - @Override - public int size() throws AccessorException { - assert b.isOpen(); - readLock(); - return index.size(); - } - - @Override - Event applyLocal(Event e, boolean makeRollback) throws AccessorException { - Event rollback = null; - if (e instanceof ValueAssigned) { - ValueAssigned va = (ValueAssigned) e; - if (makeRollback) rollback = new ValueAssigned(cb, getValue(cb)); - setValueNoflush(va.newValue.getBinding(), va.newValue.getValue()); - } else - if (e instanceof ArrayElementAdded) { - ArrayElementAdded aa = (ArrayElementAdded) e; - addNoflush(aa.index, aa.value.getBinding(), aa.value.getValue()); - if (makeRollback) rollback = new ArrayElementRemoved(aa.index); - } else if (e instanceof ArrayElementRemoved) { - ArrayElementRemoved ar = (ArrayElementRemoved) e; - if (ar.index<0 || ar.index >=size()) throw new AccessorException("Array index out of bounds"); - if (makeRollback) { - Object cv = get(ar.index, cb); - rollback = new ArrayElementAdded(ar.index, new MutableVariant(cb, cv)); - } - removeNoflush(ar.index, 1); - } else { - throw new AccessorException("Cannot apply "+e.getClass().getName()+" to Array"); - } - - return rollback; - } - - @Override - public void add(Binding binding, Object value) throws AccessorException { - assert b.isOpen(); - writeLock(); - try { - addNoflush(binding, value); - b.flush(); - } catch (IOException e) { - throw new AccessorException( e ); - } finally { - writeUnlock(); - } - } - - @Override - public void add(int index, Binding binding, Object value) - throws AccessorException { - assert b.isOpen(); - writeLock(); - try { - addNoflush(index, binding, value); - b.flush(); - } catch (IOException e) { - throw new AccessorException( e ); - } finally { - writeUnlock(); - } - } - - @Override - public void addAll(Binding binding, Object[] values) - throws AccessorException { - assert b.isOpen(); - writeLock(); - try { - addAllNoflush(binding, values); - b.flush(); - } catch (IOException e) { - throw new AccessorException( e ); - } finally { - writeUnlock(); - } - } - - @Override - public void addAll(int index, Binding binding, Object[] values) - throws AccessorException { - assert b.isOpen(); - writeLock(); - try { - addAllNoflush(index, binding, values); - b.flush(); - } catch (IOException e) { - throw new AccessorException( e ); - } finally { - writeUnlock(); - } - } - - @Override - public void remove(int index, int count) throws AccessorException { - assert b.isOpen(); - writeLock(); - try { - removeNoflush(index, count); - b.flush(); - } catch (IOException e) { - throw new AccessorException( e ); - } finally { - writeUnlock(); - } - } - - @Override - public void set(int index, Binding binding, Object value) - throws AccessorException { - assert b.isOpen(); - writeLock(); - try { - setNoflush(index, binding, value); - b.flush(); - } catch (IOException e) { - throw new AccessorException(e); - } finally { - writeUnlock(); - } - } - - @Override - public void close() throws AccessorException { - if ( this.index instanceof CloseableAccessor ) { - CloseableAccessor ca = (CloseableAccessor) this.index; - ca.close(); - } - super.close(); - } - -} - +/******************************************************************************* + * Copyright (c) 2010 Association for Decentralized Information Management in + * Industry THTH ry. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * VTT Technical Research Centre of Finland - initial API and implementation + *******************************************************************************/ +package org.simantics.databoard.accessor.binary; + +import java.io.IOException; +import java.lang.ref.WeakReference; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.SortedMap; +import java.util.TreeMap; + +import org.simantics.databoard.Bindings; +import org.simantics.databoard.accessor.Accessor; +import org.simantics.databoard.accessor.ArrayAccessor; +import org.simantics.databoard.accessor.CloseableAccessor; +import org.simantics.databoard.accessor.StreamAccessor; +import org.simantics.databoard.accessor.error.AccessorConstructionException; +import org.simantics.databoard.accessor.error.AccessorException; +import org.simantics.databoard.accessor.error.ReferenceException; +import org.simantics.databoard.accessor.event.ArrayElementAdded; +import org.simantics.databoard.accessor.event.ArrayElementRemoved; +import org.simantics.databoard.accessor.event.Event; +import org.simantics.databoard.accessor.event.ValueAssigned; +import org.simantics.databoard.accessor.file.FileArrayAccessor; +import org.simantics.databoard.accessor.impl.AccessorParams; +import org.simantics.databoard.accessor.impl.ListenerEntry; +import org.simantics.databoard.accessor.interestset.ArrayInterestSet; +import org.simantics.databoard.accessor.interestset.InterestSet; +import org.simantics.databoard.accessor.reference.ChildReference; +import org.simantics.databoard.accessor.reference.IndexReference; +import org.simantics.databoard.accessor.reference.LabelReference; +import org.simantics.databoard.adapter.AdaptException; +import org.simantics.databoard.binding.ArrayBinding; +import org.simantics.databoard.binding.Binding; +import org.simantics.databoard.binding.error.BindingException; +import org.simantics.databoard.binding.mutable.MutableVariant; +import org.simantics.databoard.serialization.Serializer; +import org.simantics.databoard.serialization.SerializerConstructionException; +import org.simantics.databoard.type.ArrayType; +import org.simantics.databoard.type.Datatype; +import org.simantics.databoard.type.LongType; +import org.simantics.databoard.util.binary.Blob; +import org.simantics.databoard.util.binary.RandomAccessBinary.ByteSide; + +/** + * Binary Array is accessor to a byte backed array of elements. + *

+ * + * Note, To increase the random access performance of the record, create sub-accessors of + * its fields. + * + * @author Toni Kalajainen + */ +public class BinaryVariableWidthStreamArray extends BinaryObject implements ArrayAccessor, FileArrayAccessor, ArrayAccessor.CloseableArrayAccessor, StreamAccessor { + + /** Accessors to children */ + TreeMap> children = new TreeMap>(); + + Binding cb; + Serializer cs; + Integer constantSize; + ArrayAccessor index; + + public BinaryVariableWidthStreamArray(BinaryObject parent, Blob blob, Datatype type, AccessorParams params, ArrayAccessor index) + throws AccessorConstructionException + { + super(parent, blob, type, params); + ArrayType at = (ArrayType) type; + cb = params.bindingScheme.getBindingUnchecked(at.componentType); + cs = params.serializerScheme.getSerializerUnchecked( cb ); + constantSize = cs.getConstantSize(); + if (index==null || index.type().componentType instanceof LongType == false) { + throw new AccessorConstructionException("Index must be Long[]"); + } + this.index = index; + } + + public ArrayType type() { + return (ArrayType) type; + } + + /** + * Get existing sub accessor + * @param index + * @return sub-accessor or null + */ + BinaryObject getExistingAccessor(int index) + { + java.lang.ref.Reference ref = children.get(index); + if (ref==null) return null; + BinaryObject res = (BinaryObject) ref.get(); +// if (res==null) children.remove(index); + return res; + } + + /** + * Get start position of a field + * + * @param fieldIndex + * @return + * @throws AccessorException + */ + long getStartPosition(int fieldIndex) throws AccessorException { + int c = this.index.size(); + if ( fieldIndex==c ) + try { + return b.length(); + } catch (IOException e) { + throw new AccessorException( e ); + } + if ( fieldIndex>c || fieldIndex<0 ) throw new AccessorException("Index out of bounds"); + return (Long) index.get(fieldIndex, Bindings.LONG); + } + + long getLength(int index, long pos) throws AccessorException { + int c = this.index.size(); + if ( index>=c || index<0 ) return 0; + if ( index==c-1 ) + { + try { + return b.length() - (Long) this.index.get(c-1, Bindings.LONG); + } catch (IOException e) { + throw new AccessorException( e ); + } + } else { + return (Long) this.index.get(index+1, Bindings.LONG) - (Long) this.index.get(index, Bindings.LONG); + } + } + + @Override + public void setNoflush(int index, Binding rcb, Object rcv) + throws AccessorException { + assert b.isOpen(); + + writeLock(); + try { + + int count = size(); + if ( index==count ) { + addNoflush(count, rcb, rcv); + return; + } + + // Write + Serializer rcs = params.serializerScheme.getSerializer( rcb ); + long pos = getStartPosition(index); + long oldSize = getLength(index, pos); + long newSize = rcs.getSize(rcv, null); + b.position(pos); + long diff = newSize - oldSize; + if (diff>0) { + b.insertBytes( newSize - oldSize, ByteSide.Right ); + } else if (diff<0) { + b.removeBytes( oldSize - newSize, ByteSide.Right ); + } + b.position(pos); + rcs.serialize(b, null, rcv); + + // Update index of the higher indices + if ( diff!= 0 ) { + for (int i=index+1; i> pm = children.subMap(index+1, true, lastKey, true); + for (Entry> e : pm.entrySet()) { + BinaryObject sa_ = e.getValue().get(); + if (sa_ == null) continue; + sa_.b.setPositionInSource( sa_.b.getStartPositionInSourceBinary() + diff, sa_.b.length()); + } + } + }*/ + + // Notify + ListenerEntry le = listeners; + while (le!=null) { + ArrayInterestSet is = le.getInterestSet(); + if (is.inNotificationsOf(index)) { + MutableVariant newValue = null; + if (is.inValues()) newValue = new MutableVariant(rcb, rcb.isImmutable() ? rcv : rcb.clone(rcv)); + + Event e = new ValueAssigned(new IndexReference(index), newValue); + emitEvent(le, e); + } + le = le.next; + } + + } catch (IOException e) { + throw new AccessorException(e); + } catch (AdaptException e) { + throw new AccessorException(e); + } catch (SerializerConstructionException e) { + throw new AccessorException(e); + } finally { + writeUnlock(); + } + + } + + /** + * Set all values + * + * @param arrayBinding + * @param newArray + */ + @Override + public void setValueNoflush(Binding arrayBinding, Object newArray) + throws AccessorException { + assert b.isOpen(); + writeLock(); + try { + // Write + ArrayBinding rb = ((ArrayBinding)arrayBinding); + Binding rcb = rb.getComponentBinding(); + Serializer rcs = params.serializerScheme.getSerializer( rcb ); + int oldCount = index.size(); + int newCount = rb.size(newArray); + b.setLength( params.serializerScheme.getSerializer( rb ).getSize(newArray) ); + index.setSize(newCount); + + // Serialize + for (int index=0; index=newCount; index--) { + BinaryObject sa = getExistingAccessor(index); + if (sa!=null) { + sa.invalidatedNotification(); + children.remove(index); + sa = null; + } + + // Notify changes + ListenerEntry le = listeners; + while (le!=null) { + ArrayInterestSet is = le.getInterestSet(); + if (is.inNotificationsOf(index)) { + Event e = new ArrayElementRemoved(index); + emitEvent(le, e); + } + le = le.next; + } + } + + // Notify new assignment + if (listeners!=null) { + for (int index=0; indexoldCount) throw new AccessorException("Index out of range"); + + long pos = getStartPosition(index); + int size = rcs.getSize(rcv); + b.position(pos); + b.insertBytes(size, ByteSide.Left); + rcs.serialize(b, null, rcv); + + this.index.add(index, Bindings.LONG, pos); + + for (int i=index+1; i= index) { + java.lang.ref.Reference value = children.remove(key); + if (value.get()!=null) children.put(key+1, value); + key = children.lowerKey(key); + } + } + + // Notify Listeners + ListenerEntry le = listeners; + while (le!=null) { + ArrayInterestSet is = le.getInterestSet(); + if (is.inNotifications()) { + MutableVariant newValue = null; + if (is.inValues()) newValue = new MutableVariant(rcb, rcb.isImmutable() ? rcv : rcb.clone(rcv)); + ArrayElementAdded e = new ArrayElementAdded(index, newValue); + emitEvent(le, e); + } + + // Update indices of interest sets + if (is.componentInterests!=null) { + Map oldCis = is.componentInterests; + boolean needUpdates = false; + for (Integer i : oldCis.keySet()) { + needUpdates |= i>=index; + if (needUpdates) break; + } + + if (needUpdates) { + Map newCis = new HashMap(oldCis.size()); + for (Integer i : oldCis.keySet()) + { + Integer oldKey = i; + Integer newKey = i>=index ? i+1 : i; + InterestSet oldValue = oldCis.get(oldKey); + newCis.put(newKey, oldValue); + } + is.componentInterests = newCis; + } + } + + // Add component interest listener + /* + boolean hadSa = getExistingAccessor(index) != null; + if (!hadSa) { + // Add component interest listener + InterestSet cis = is.getComponentInterest(); + if (cis != null) { + Accessor sa = getAccessor(index); + } + cis = is.getComponentInterest(index); + if (cis != null) { + Accessor sa = getAccessor(index); + } + } */ + + le = le.next; + } + + } catch (IOException e) { + throw new AccessorException(e); + } catch (AdaptException e) { + throw new AccessorException(e); + } catch (SerializerConstructionException e) { + throw new AccessorException(e); + } finally { + writeUnlock(); + } + } + + @Override + public void addNoflush(Binding binding, Object value) + throws AccessorException { + addNoflush(size(), binding, value); + } + + @Override + public void addAllNoflush(Binding binding, Object[] values) + throws AccessorException { + addAllNoflush(size(), binding, values); + } + + @Override + public void addAllNoflush(int index, Binding rcb, Object[] rcvs) + throws AccessorException { + if (index<0||index>size()) throw new AccessorException("Index out of bounds"); + assert b.isOpen(); + writeLock(); + try { + Serializer rcs = params.serializerScheme.getSerializer( rcb ); + // Write + b.position(0L); + int repeatCount = rcvs.length; + int oldCount = b.readInt(); + int newCount = oldCount + rcvs.length; + if (index>oldCount) throw new AccessorException("Index out of range"); + boolean lastEntry = index == oldCount; + + long size = 0; + for (int i=0; i= index) { + java.lang.ref.Reference value = children.remove(key); + if (value.get()!=null) children.put(key+rcvs.length, value); + key = children.lowerKey(key); + } + } + + // Notify Listeners + ListenerEntry le = listeners; + while (le!=null) { + ArrayInterestSet is = le.getInterestSet(); + if (is.inNotifications()) { + for (int i=0; i oldCis = is.componentInterests; + boolean needUpdates = false; + for (Integer i : oldCis.keySet()) { + needUpdates |= i>=index; + if (needUpdates) break; + } + + if (needUpdates) { + Map newCis = new HashMap(oldCis.size()); + for (Integer i : oldCis.keySet()) + { + Integer oldKey = i; + Integer newKey = i>=index ? i+rcvs.length : i; + InterestSet oldValue = oldCis.get(oldKey); + newCis.put(newKey, oldValue); + } + is.componentInterests = newCis; + } + + // Add component interest listener + /* + for (int i = index; isize()) throw new AccessorException("Index out of bounds"); + assert b.isOpen(); + writeLock(); + try { + Serializer rcs = params.serializerScheme.getSerializer( rcb ); + // Write + b.position(0L); + int oldCount = this.index.size(); + int newCount = oldCount + repeatCount; + if (index>oldCount) throw new AccessorException("Index out of range"); + boolean lastEntry = index == oldCount; + + long componentSize = rcs.getSize(obj); + long size = componentSize * repeatCount; + long pos = getStartPosition(index); + b.position(pos); + b.insertBytes(size, ByteSide.Right); + long p = pos; + for (int i=0; i= index) { + java.lang.ref.Reference value = children.remove(key); + if (value.get()!=null) children.put(key+repeatCount, value); + key = children.lowerKey(key); + } + } + + // Notify Listeners + ListenerEntry le = listeners; + while (le!=null) { + ArrayInterestSet is = le.getInterestSet(); + if (is.inNotifications()) { + for (int i=0; i oldCis = is.componentInterests; + boolean needUpdates = false; + for (Integer i : oldCis.keySet()) { + needUpdates |= i>=index; + if (needUpdates) break; + } + + if (needUpdates) { + Map newCis = new HashMap(oldCis.size()); + for (Integer i : oldCis.keySet()) + { + Integer oldKey = i; + Integer newKey = i>=index ? i+repeatCount : i; + InterestSet oldValue = oldCis.get(oldKey); + newCis.put(newKey, oldValue); + } + is.componentInterests = newCis; + } + + } + + le = le.next; + } + + } catch (IOException e) { + throw new AccessorException(e); + } catch (SerializerConstructionException e) { + throw new AccessorException(e); + } finally { + writeUnlock(); + } + } + + @Override + public void removeNoflush(int index, int count) throws AccessorException { + assert b.isOpen(); + writeLock(); + try { + // Write + boolean lastEntry = index == count; + int oldCount = this.index.size(); + int newCount = oldCount - count; + if (index<0||index+count>oldCount) throw new AccessorException("Index out of bounds"); + long pos = getStartPosition(index); + long lastPos = getStartPosition(index+count-1); + long lastLen = getLength(index, lastPos); + long end = lastPos + lastLen; + long len = end - pos; + b.position(pos); + b.removeBytes(len, ByteSide.Right); + this.index.remove(index, count); + for (int i=index; i> sm = children.subMap(index, true, index+count, false); + for (Entry> e : sm.entrySet()) { + BinaryObject bo = e.getValue().get(); + if (bo==null) continue; + bo.invalidatedNotification(); + } + sm.clear(); + + // Update the keys of consecutive children + if (!lastEntry && !children.isEmpty()) { + Integer lastKey = children.lastKey(); + Integer key = children.higherKey(index); + while (key != null && key <= lastKey) { + java.lang.ref.Reference value = children.remove(key); + if (value.get()!=null) children.put(key-count, value); + key = children.higherKey(key); + } + } + + // Notify Listeners + ListenerEntry le = listeners; + while (le!=null) { + ArrayInterestSet is = le.getInterestSet(); + if (is.inNotifications()) { + ArrayElementRemoved e = new ArrayElementRemoved(index); + emitEvent(le, e); + } + + // Update indices of interest sets + if (is.componentInterests!=null) { + Map oldCis = is.componentInterests; + boolean needUpdates = false; + for (Integer i : oldCis.keySet()) { + needUpdates |= i>=index; + if (needUpdates) break; + } + + if (needUpdates) { + Map newCis = new HashMap(oldCis.size()); + for (Integer i : oldCis.keySet()) + { + Integer oldKey = i; + Integer newKey = i>=index ? i-1 : i; + InterestSet oldValue = oldCis.get(oldKey); + newCis.put(newKey, oldValue); + } + is.componentInterests = newCis; + } + } + le = le.next; + } + + + } catch (IOException e) { + throw new AccessorException( e ); + } finally { + writeUnlock(); + } + } + + @Override + public Object get(int index, Binding valueBinding) throws AccessorException { + assert b.isOpen(); + readLock(); + try { + long pos = getStartPosition(index); + b.position(pos); + Serializer s = params.serializerScheme.getSerializer(valueBinding); + return s.deserialize(b); + } catch (IOException e) { + throw new AccessorException(e); + } catch (SerializerConstructionException e) { + throw new AccessorException(e); + } finally { + readUnlock(); + } + } + + @Override + public void get(int index, Binding valueBinding, Object dst) throws AccessorException { + assert b.isOpen(); + readLock(); + try { + long pos = getStartPosition(index); + b.position(pos); + Serializer s = params.serializerScheme.getSerializer(valueBinding); + s.deserializeTo(b, dst); + } catch (IOException e) { + throw new AccessorException(e); + } catch (SerializerConstructionException e) { + throw new AccessorException(e); + } finally { + readUnlock(); + } + } + + + @SuppressWarnings("unchecked") + @Override + public T getAccessor(int index) + throws AccessorConstructionException { + assert b.isOpen(); + readLock(); + try { + b.position(0L); + int count = b.readInt(); + if (index<0 || index>=count) throw new ReferenceException("Element index ("+index+") out of bounds ("+count+")"); + + // Get existing or create new + BinaryObject sa = getExistingAccessor(index); + if (sa==null) { + long pos = getStartPosition(index); + long len = getLength(index, pos); + + // Instantiate correct sub accessor. + sa = createSubAccessor(cb.type(), pos, len, params); + children.put(index, new WeakReference(sa) ); + + // Add component interest sets + ListenerEntry le = listeners; + while (le!=null) { + ArrayInterestSet is = le.getInterestSet(); + + // Generic element interest + InterestSet gis = is.getComponentInterest(); + if (gis != null) { + try { + ChildReference childPath = ChildReference.concatenate(le.path, new IndexReference(index) ); + sa.addListener(le.listener, gis, childPath, le.executor); + } catch (AccessorException e) { + throw new AccessorConstructionException(e); + } + } + + // Specific element interest + InterestSet cis = is.getComponentInterest(index); + if (cis != null) { + try { + ChildReference childPath = ChildReference.concatenate(le.path, new IndexReference(index) ); + sa.addListener(le.listener, cis, childPath,le.executor); + } catch (AccessorException e) { + throw new AccessorConstructionException(e); + } + } + + // Next listener + le = le.next; + } + + } + + return (T) sa; + } catch (IOException e) { + throw new AccessorConstructionException(e); + } catch (AccessorException e) { + throw new AccessorConstructionException(e); + } finally { + readUnlock(); + } + } + + @SuppressWarnings("unchecked") + @Override + public T getComponent(ChildReference reference) + throws AccessorConstructionException { + if (reference==null) return (T) this; + if (reference instanceof LabelReference) { + LabelReference lr = (LabelReference) reference; + try { + Integer index = new Integer( lr.label ); + Accessor result = getAccessor(index); + if (reference.getChildReference() != null) + result = result.getComponent(reference.getChildReference()); + return (T) result; + } catch ( NumberFormatException nfe ) { + throw new ReferenceException(nfe); + } + } else if (reference instanceof IndexReference) { + IndexReference ref = (IndexReference) reference; + int index = ref.getIndex(); + Accessor result = getAccessor(index); + if (reference.getChildReference() != null) + result = result.getComponent(reference.getChildReference()); + return (T) result; + } throw new ReferenceException(reference.getClass().getName()+" is not a reference of an array"); + } + + @Override + public void getAll(Binding valueBinding, Object[] array) + throws AccessorException { + assert b.isOpen(); + readLock(); + try { + b.position(0L); + int size = b.readInt(); + if (size > array.length) throw new AccessorException("Argument array too short"); + Serializer s = params.serializerScheme.getSerializer(valueBinding); + for (int i=0; i values) + throws AccessorException { + assert b.isOpen(); + readLock(); + try { + b.position(0L); + int size = b.readInt(); + Serializer s = params.serializerScheme.getSerializer(valueBinding); + for (int i=0; ioldSize) { + Object dummy = cb.createDefault(); + int count = newSize-oldSize; + addRepeatNoflush(oldSize, cb, dummy, count); + } + + } catch (BindingException e) { + throw new AccessorException( e ); + } finally { + writeUnlock(); + } + } + + @Override + public void setSize(int newSize) throws AccessorException { + assert b.isOpen(); + writeLock(); + try { + setSizeNoflush(newSize); + b.flush(); + } catch (IOException e) { + throw new AccessorException( e ); + } finally { + writeUnlock(); + } + } + + @Override + public int size() throws AccessorException { + assert b.isOpen(); + readLock(); + return index.size(); + } + + @Override + Event applyLocal(Event e, boolean makeRollback) throws AccessorException { + Event rollback = null; + if (e instanceof ValueAssigned) { + ValueAssigned va = (ValueAssigned) e; + if (makeRollback) rollback = new ValueAssigned(cb, getValue(cb)); + setValueNoflush(va.newValue.getBinding(), va.newValue.getValue()); + } else + if (e instanceof ArrayElementAdded) { + ArrayElementAdded aa = (ArrayElementAdded) e; + addNoflush(aa.index, aa.value.getBinding(), aa.value.getValue()); + if (makeRollback) rollback = new ArrayElementRemoved(aa.index); + } else if (e instanceof ArrayElementRemoved) { + ArrayElementRemoved ar = (ArrayElementRemoved) e; + if (ar.index<0 || ar.index >=size()) throw new AccessorException("Array index out of bounds"); + if (makeRollback) { + Object cv = get(ar.index, cb); + rollback = new ArrayElementAdded(ar.index, new MutableVariant(cb, cv)); + } + removeNoflush(ar.index, 1); + } else { + throw new AccessorException("Cannot apply "+e.getClass().getName()+" to Array"); + } + + return rollback; + } + + @Override + public void add(Binding binding, Object value) throws AccessorException { + assert b.isOpen(); + writeLock(); + try { + addNoflush(binding, value); + b.flush(); + } catch (IOException e) { + throw new AccessorException( e ); + } finally { + writeUnlock(); + } + } + + @Override + public void add(int index, Binding binding, Object value) + throws AccessorException { + assert b.isOpen(); + writeLock(); + try { + addNoflush(index, binding, value); + b.flush(); + } catch (IOException e) { + throw new AccessorException( e ); + } finally { + writeUnlock(); + } + } + + @Override + public void addAll(Binding binding, Object[] values) + throws AccessorException { + assert b.isOpen(); + writeLock(); + try { + addAllNoflush(binding, values); + b.flush(); + } catch (IOException e) { + throw new AccessorException( e ); + } finally { + writeUnlock(); + } + } + + @Override + public void addAll(int index, Binding binding, Object[] values) + throws AccessorException { + assert b.isOpen(); + writeLock(); + try { + addAllNoflush(index, binding, values); + b.flush(); + } catch (IOException e) { + throw new AccessorException( e ); + } finally { + writeUnlock(); + } + } + + @Override + public void remove(int index, int count) throws AccessorException { + assert b.isOpen(); + writeLock(); + try { + removeNoflush(index, count); + b.flush(); + } catch (IOException e) { + throw new AccessorException( e ); + } finally { + writeUnlock(); + } + } + + @Override + public void set(int index, Binding binding, Object value) + throws AccessorException { + assert b.isOpen(); + writeLock(); + try { + setNoflush(index, binding, value); + b.flush(); + } catch (IOException e) { + throw new AccessorException(e); + } finally { + writeUnlock(); + } + } + + @Override + public void close() throws AccessorException { + if ( this.index instanceof CloseableAccessor ) { + CloseableAccessor ca = (CloseableAccessor) this.index; + ca.close(); + } + super.close(); + } + +} +