/*******************************************************************************
* Copyright (c) 2010 Association for Decentralized Information Management in
* Industry THTH ry.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* VTT Technical Research Centre of Finland - initial API and implementation
*******************************************************************************/
package org.simantics.databoard.accessor.binary;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.SortedMap;
import java.util.TreeMap;
import org.simantics.databoard.Bindings;
import org.simantics.databoard.accessor.Accessor;
import org.simantics.databoard.accessor.ArrayAccessor;
import org.simantics.databoard.accessor.CloseableAccessor;
import org.simantics.databoard.accessor.StreamAccessor;
import org.simantics.databoard.accessor.error.AccessorConstructionException;
import org.simantics.databoard.accessor.error.AccessorException;
import org.simantics.databoard.accessor.error.ReferenceException;
import org.simantics.databoard.accessor.event.ArrayElementAdded;
import org.simantics.databoard.accessor.event.ArrayElementRemoved;
import org.simantics.databoard.accessor.event.Event;
import org.simantics.databoard.accessor.event.ValueAssigned;
import org.simantics.databoard.accessor.file.FileArrayAccessor;
import org.simantics.databoard.accessor.impl.AccessorParams;
import org.simantics.databoard.accessor.impl.ListenerEntry;
import org.simantics.databoard.accessor.interestset.ArrayInterestSet;
import org.simantics.databoard.accessor.interestset.InterestSet;
import org.simantics.databoard.accessor.reference.ChildReference;
import org.simantics.databoard.accessor.reference.IndexReference;
import org.simantics.databoard.accessor.reference.LabelReference;
import org.simantics.databoard.adapter.AdaptException;
import org.simantics.databoard.binding.ArrayBinding;
import org.simantics.databoard.binding.Binding;
import org.simantics.databoard.binding.error.BindingException;
import org.simantics.databoard.binding.mutable.MutableVariant;
import org.simantics.databoard.serialization.Serializer;
import org.simantics.databoard.serialization.SerializerConstructionException;
import org.simantics.databoard.type.ArrayType;
import org.simantics.databoard.type.Datatype;
import org.simantics.databoard.type.LongType;
import org.simantics.databoard.util.binary.Blob;
import org.simantics.databoard.util.binary.RandomAccessBinary.ByteSide;
/**
* Binary Array is accessor to a byte backed array of elements.
*
*
* Note, To increase the random access performance of the record, create sub-accessors of
* its fields.
*
* @author Toni Kalajainen
*/
public class BinaryVariableWidthStreamArray extends BinaryObject implements ArrayAccessor, FileArrayAccessor, ArrayAccessor.CloseableArrayAccessor, StreamAccessor {
/** Accessors to children */
TreeMap> children = new TreeMap>();
Binding cb;
Serializer cs;
Integer constantSize;
ArrayAccessor index;
public BinaryVariableWidthStreamArray(BinaryObject parent, Blob blob, Datatype type, AccessorParams params, ArrayAccessor index)
throws AccessorConstructionException
{
super(parent, blob, type, params);
ArrayType at = (ArrayType) type;
cb = params.bindingScheme.getBindingUnchecked(at.componentType);
cs = params.serializerScheme.getSerializerUnchecked( cb );
constantSize = cs.getConstantSize();
if (index==null || index.type().componentType instanceof LongType == false) {
throw new AccessorConstructionException("Index must be Long[]");
}
this.index = index;
}
public ArrayType type() {
return (ArrayType) type;
}
/**
* Get existing sub accessor
* @param index
* @return sub-accessor or null
*/
BinaryObject getExistingAccessor(int index)
{
java.lang.ref.Reference ref = children.get(index);
if (ref==null) return null;
BinaryObject res = (BinaryObject) ref.get();
// if (res==null) children.remove(index);
return res;
}
/**
* Get start position of a field
*
* @param fieldIndex
* @return
* @throws AccessorException
*/
long getStartPosition(int fieldIndex) throws AccessorException {
int c = this.index.size();
if ( fieldIndex==c )
try {
return b.length();
} catch (IOException e) {
throw new AccessorException( e );
}
if ( fieldIndex>c || fieldIndex<0 ) throw new AccessorException("Index out of bounds");
return (Long) index.get(fieldIndex, Bindings.LONG);
}
long getLength(int index, long pos) throws AccessorException {
int c = this.index.size();
if ( index>=c || index<0 ) return 0;
if ( index==c-1 )
{
try {
return b.length() - (Long) this.index.get(c-1, Bindings.LONG);
} catch (IOException e) {
throw new AccessorException( e );
}
} else {
return (Long) this.index.get(index+1, Bindings.LONG) - (Long) this.index.get(index, Bindings.LONG);
}
}
@Override
public void setNoflush(int index, Binding rcb, Object rcv)
throws AccessorException {
assert b.isOpen();
writeLock();
try {
int count = size();
if ( index==count ) {
addNoflush(count, rcb, rcv);
return;
}
// Write
Serializer rcs = params.serializerScheme.getSerializer( rcb );
long pos = getStartPosition(index);
long oldSize = getLength(index, pos);
long newSize = rcs.getSize(rcv, null);
b.position(pos);
long diff = newSize - oldSize;
if (diff>0) {
b.insertBytes( newSize - oldSize, ByteSide.Right );
} else if (diff<0) {
b.removeBytes( oldSize - newSize, ByteSide.Right );
}
b.position(pos);
rcs.serialize(b, null, rcv);
// Update index of the higher indices
if ( diff!= 0 ) {
for (int i=index+1; i> pm = children.subMap(index+1, true, lastKey, true);
for (Entry> e : pm.entrySet()) {
BinaryObject sa_ = e.getValue().get();
if (sa_ == null) continue;
sa_.b.setPositionInSource( sa_.b.getStartPositionInSourceBinary() + diff, sa_.b.length());
}
}
}*/
// Notify
ListenerEntry le = listeners;
while (le!=null) {
ArrayInterestSet is = le.getInterestSet();
if (is.inNotificationsOf(index)) {
MutableVariant newValue = null;
if (is.inValues()) newValue = new MutableVariant(rcb, rcb.isImmutable() ? rcv : rcb.clone(rcv));
Event e = new ValueAssigned(new IndexReference(index), newValue);
emitEvent(le, e);
}
le = le.next;
}
} catch (IOException e) {
throw new AccessorException(e);
} catch (AdaptException e) {
throw new AccessorException(e);
} catch (SerializerConstructionException e) {
throw new AccessorException(e);
} finally {
writeUnlock();
}
}
/**
* Set all values
*
* @param arrayBinding
* @param newArray
*/
@Override
public void setValueNoflush(Binding arrayBinding, Object newArray)
throws AccessorException {
assert b.isOpen();
writeLock();
try {
// Write
ArrayBinding rb = ((ArrayBinding)arrayBinding);
Binding rcb = rb.getComponentBinding();
Serializer rcs = params.serializerScheme.getSerializer( rcb );
int oldCount = index.size();
int newCount = rb.size(newArray);
b.setLength( params.serializerScheme.getSerializer( rb ).getSize(newArray) );
index.setSize(newCount);
// Serialize
for (int index=0; index=newCount; index--) {
BinaryObject sa = getExistingAccessor(index);
if (sa!=null) {
sa.invalidatedNotification();
children.remove(index);
sa = null;
}
// Notify changes
ListenerEntry le = listeners;
while (le!=null) {
ArrayInterestSet is = le.getInterestSet();
if (is.inNotificationsOf(index)) {
Event e = new ArrayElementRemoved(index);
emitEvent(le, e);
}
le = le.next;
}
}
// Notify new assignment
if (listeners!=null) {
for (int index=0; indexoldCount) throw new AccessorException("Index out of range");
long pos = getStartPosition(index);
int size = rcs.getSize(rcv);
b.position(pos);
b.insertBytes(size, ByteSide.Left);
rcs.serialize(b, null, rcv);
this.index.add(index, Bindings.LONG, pos);
for (int i=index+1; i= index) {
java.lang.ref.Reference value = children.remove(key);
if (value.get()!=null) children.put(key+1, value);
key = children.lowerKey(key);
}
}
// Notify Listeners
ListenerEntry le = listeners;
while (le!=null) {
ArrayInterestSet is = le.getInterestSet();
if (is.inNotifications()) {
MutableVariant newValue = null;
if (is.inValues()) newValue = new MutableVariant(rcb, rcb.isImmutable() ? rcv : rcb.clone(rcv));
ArrayElementAdded e = new ArrayElementAdded(index, newValue);
emitEvent(le, e);
}
// Update indices of interest sets
if (is.componentInterests!=null) {
Map oldCis = is.componentInterests;
boolean needUpdates = false;
for (Integer i : oldCis.keySet()) {
needUpdates |= i>=index;
if (needUpdates) break;
}
if (needUpdates) {
Map newCis = new HashMap(oldCis.size());
for (Integer i : oldCis.keySet())
{
Integer oldKey = i;
Integer newKey = i>=index ? i+1 : i;
InterestSet oldValue = oldCis.get(oldKey);
newCis.put(newKey, oldValue);
}
is.componentInterests = newCis;
}
}
// Add component interest listener
/*
boolean hadSa = getExistingAccessor(index) != null;
if (!hadSa) {
// Add component interest listener
InterestSet cis = is.getComponentInterest();
if (cis != null) {
Accessor sa = getAccessor(index);
}
cis = is.getComponentInterest(index);
if (cis != null) {
Accessor sa = getAccessor(index);
}
} */
le = le.next;
}
} catch (IOException e) {
throw new AccessorException(e);
} catch (AdaptException e) {
throw new AccessorException(e);
} catch (SerializerConstructionException e) {
throw new AccessorException(e);
} finally {
writeUnlock();
}
}
@Override
public void addNoflush(Binding binding, Object value)
throws AccessorException {
addNoflush(size(), binding, value);
}
@Override
public void addAllNoflush(Binding binding, Object[] values)
throws AccessorException {
addAllNoflush(size(), binding, values);
}
@Override
public void addAllNoflush(int index, Binding rcb, Object[] rcvs)
throws AccessorException {
if (index<0||index>size()) throw new AccessorException("Index out of bounds");
assert b.isOpen();
writeLock();
try {
Serializer rcs = params.serializerScheme.getSerializer( rcb );
// Write
b.position(0L);
int repeatCount = rcvs.length;
int oldCount = b.readInt();
int newCount = oldCount + rcvs.length;
if (index>oldCount) throw new AccessorException("Index out of range");
boolean lastEntry = index == oldCount;
long size = 0;
for (int i=0; i= index) {
java.lang.ref.Reference value = children.remove(key);
if (value.get()!=null) children.put(key+rcvs.length, value);
key = children.lowerKey(key);
}
}
// Notify Listeners
ListenerEntry le = listeners;
while (le!=null) {
ArrayInterestSet is = le.getInterestSet();
if (is.inNotifications()) {
for (int i=0; i oldCis = is.componentInterests;
boolean needUpdates = false;
for (Integer i : oldCis.keySet()) {
needUpdates |= i>=index;
if (needUpdates) break;
}
if (needUpdates) {
Map newCis = new HashMap(oldCis.size());
for (Integer i : oldCis.keySet())
{
Integer oldKey = i;
Integer newKey = i>=index ? i+rcvs.length : i;
InterestSet oldValue = oldCis.get(oldKey);
newCis.put(newKey, oldValue);
}
is.componentInterests = newCis;
}
// Add component interest listener
/*
for (int i = index; isize()) throw new AccessorException("Index out of bounds");
assert b.isOpen();
writeLock();
try {
Serializer rcs = params.serializerScheme.getSerializer( rcb );
// Write
b.position(0L);
int oldCount = this.index.size();
int newCount = oldCount + repeatCount;
if (index>oldCount) throw new AccessorException("Index out of range");
boolean lastEntry = index == oldCount;
long componentSize = rcs.getSize(obj);
long size = componentSize * repeatCount;
long pos = getStartPosition(index);
b.position(pos);
b.insertBytes(size, ByteSide.Right);
long p = pos;
for (int i=0; i= index) {
java.lang.ref.Reference value = children.remove(key);
if (value.get()!=null) children.put(key+repeatCount, value);
key = children.lowerKey(key);
}
}
// Notify Listeners
ListenerEntry le = listeners;
while (le!=null) {
ArrayInterestSet is = le.getInterestSet();
if (is.inNotifications()) {
for (int i=0; i oldCis = is.componentInterests;
boolean needUpdates = false;
for (Integer i : oldCis.keySet()) {
needUpdates |= i>=index;
if (needUpdates) break;
}
if (needUpdates) {
Map newCis = new HashMap(oldCis.size());
for (Integer i : oldCis.keySet())
{
Integer oldKey = i;
Integer newKey = i>=index ? i+repeatCount : i;
InterestSet oldValue = oldCis.get(oldKey);
newCis.put(newKey, oldValue);
}
is.componentInterests = newCis;
}
}
le = le.next;
}
} catch (IOException e) {
throw new AccessorException(e);
} catch (SerializerConstructionException e) {
throw new AccessorException(e);
} finally {
writeUnlock();
}
}
@Override
public void removeNoflush(int index, int count) throws AccessorException {
assert b.isOpen();
writeLock();
try {
// Write
boolean lastEntry = index == count;
int oldCount = this.index.size();
int newCount = oldCount - count;
if (index<0||index+count>oldCount) throw new AccessorException("Index out of bounds");
long pos = getStartPosition(index);
long lastPos = getStartPosition(index+count-1);
long lastLen = getLength(index, lastPos);
long end = lastPos + lastLen;
long len = end - pos;
b.position(pos);
b.removeBytes(len, ByteSide.Right);
this.index.remove(index, count);
for (int i=index; i> sm = children.subMap(index, true, index+count, false);
for (Entry> e : sm.entrySet()) {
BinaryObject bo = e.getValue().get();
if (bo==null) continue;
bo.invalidatedNotification();
}
sm.clear();
// Update the keys of consecutive children
if (!lastEntry && !children.isEmpty()) {
Integer lastKey = children.lastKey();
Integer key = children.higherKey(index);
while (key != null && key <= lastKey) {
java.lang.ref.Reference value = children.remove(key);
if (value.get()!=null) children.put(key-count, value);
key = children.higherKey(key);
}
}
// Notify Listeners
ListenerEntry le = listeners;
while (le!=null) {
ArrayInterestSet is = le.getInterestSet();
if (is.inNotifications()) {
ArrayElementRemoved e = new ArrayElementRemoved(index);
emitEvent(le, e);
}
// Update indices of interest sets
if (is.componentInterests!=null) {
Map oldCis = is.componentInterests;
boolean needUpdates = false;
for (Integer i : oldCis.keySet()) {
needUpdates |= i>=index;
if (needUpdates) break;
}
if (needUpdates) {
Map newCis = new HashMap(oldCis.size());
for (Integer i : oldCis.keySet())
{
Integer oldKey = i;
Integer newKey = i>=index ? i-1 : i;
InterestSet oldValue = oldCis.get(oldKey);
newCis.put(newKey, oldValue);
}
is.componentInterests = newCis;
}
}
le = le.next;
}
} catch (IOException e) {
throw new AccessorException( e );
} finally {
writeUnlock();
}
}
@Override
public Object get(int index, Binding valueBinding) throws AccessorException {
assert b.isOpen();
readLock();
try {
long pos = getStartPosition(index);
b.position(pos);
Serializer s = params.serializerScheme.getSerializer(valueBinding);
return s.deserialize(b);
} catch (IOException e) {
throw new AccessorException(e);
} catch (SerializerConstructionException e) {
throw new AccessorException(e);
} finally {
readUnlock();
}
}
@Override
public void get(int index, Binding valueBinding, Object dst) throws AccessorException {
assert b.isOpen();
readLock();
try {
long pos = getStartPosition(index);
b.position(pos);
Serializer s = params.serializerScheme.getSerializer(valueBinding);
s.deserializeTo(b, dst);
} catch (IOException e) {
throw new AccessorException(e);
} catch (SerializerConstructionException e) {
throw new AccessorException(e);
} finally {
readUnlock();
}
}
@SuppressWarnings("unchecked")
@Override
public T getAccessor(int index)
throws AccessorConstructionException {
assert b.isOpen();
readLock();
try {
b.position(0L);
int count = b.readInt();
if (index<0 || index>=count) throw new ReferenceException("Element index ("+index+") out of bounds ("+count+")");
// Get existing or create new
BinaryObject sa = getExistingAccessor(index);
if (sa==null) {
long pos = getStartPosition(index);
long len = getLength(index, pos);
// Instantiate correct sub accessor.
sa = createSubAccessor(cb.type(), pos, len, params);
children.put(index, new WeakReference(sa) );
// Add component interest sets
ListenerEntry le = listeners;
while (le!=null) {
ArrayInterestSet is = le.getInterestSet();
// Generic element interest
InterestSet gis = is.getComponentInterest();
if (gis != null) {
try {
ChildReference childPath = ChildReference.concatenate(le.path, new IndexReference(index) );
sa.addListener(le.listener, gis, childPath, le.executor);
} catch (AccessorException e) {
throw new AccessorConstructionException(e);
}
}
// Specific element interest
InterestSet cis = is.getComponentInterest(index);
if (cis != null) {
try {
ChildReference childPath = ChildReference.concatenate(le.path, new IndexReference(index) );
sa.addListener(le.listener, cis, childPath,le.executor);
} catch (AccessorException e) {
throw new AccessorConstructionException(e);
}
}
// Next listener
le = le.next;
}
}
return (T) sa;
} catch (IOException e) {
throw new AccessorConstructionException(e);
} catch (AccessorException e) {
throw new AccessorConstructionException(e);
} finally {
readUnlock();
}
}
@SuppressWarnings("unchecked")
@Override
public T getComponent(ChildReference reference)
throws AccessorConstructionException {
if (reference==null) return (T) this;
if (reference instanceof LabelReference) {
LabelReference lr = (LabelReference) reference;
try {
Integer index = new Integer( lr.label );
Accessor result = getAccessor(index);
if (reference.getChildReference() != null)
result = result.getComponent(reference.getChildReference());
return (T) result;
} catch ( NumberFormatException nfe ) {
throw new ReferenceException(nfe);
}
} else if (reference instanceof IndexReference) {
IndexReference ref = (IndexReference) reference;
int index = ref.getIndex();
Accessor result = getAccessor(index);
if (reference.getChildReference() != null)
result = result.getComponent(reference.getChildReference());
return (T) result;
} throw new ReferenceException(reference.getClass().getName()+" is not a reference of an array");
}
@Override
public void getAll(Binding valueBinding, Object[] array)
throws AccessorException {
assert b.isOpen();
readLock();
try {
b.position(0L);
int size = b.readInt();
if (size > array.length) throw new AccessorException("Argument array too short");
Serializer s = params.serializerScheme.getSerializer(valueBinding);
for (int i=0; i values)
throws AccessorException {
assert b.isOpen();
readLock();
try {
b.position(0L);
int size = b.readInt();
Serializer s = params.serializerScheme.getSerializer(valueBinding);
for (int i=0; ioldSize) {
Object dummy = cb.createDefault();
int count = newSize-oldSize;
addRepeatNoflush(oldSize, cb, dummy, count);
}
} catch (BindingException e) {
throw new AccessorException( e );
} finally {
writeUnlock();
}
}
@Override
public void setSize(int newSize) throws AccessorException {
assert b.isOpen();
writeLock();
try {
setSizeNoflush(newSize);
b.flush();
} catch (IOException e) {
throw new AccessorException( e );
} finally {
writeUnlock();
}
}
@Override
public int size() throws AccessorException {
assert b.isOpen();
readLock();
return index.size();
}
@Override
Event applyLocal(Event e, boolean makeRollback) throws AccessorException {
Event rollback = null;
if (e instanceof ValueAssigned) {
ValueAssigned va = (ValueAssigned) e;
if (makeRollback) rollback = new ValueAssigned(cb, getValue(cb));
setValueNoflush(va.newValue.getBinding(), va.newValue.getValue());
} else
if (e instanceof ArrayElementAdded) {
ArrayElementAdded aa = (ArrayElementAdded) e;
addNoflush(aa.index, aa.value.getBinding(), aa.value.getValue());
if (makeRollback) rollback = new ArrayElementRemoved(aa.index);
} else if (e instanceof ArrayElementRemoved) {
ArrayElementRemoved ar = (ArrayElementRemoved) e;
if (ar.index<0 || ar.index >=size()) throw new AccessorException("Array index out of bounds");
if (makeRollback) {
Object cv = get(ar.index, cb);
rollback = new ArrayElementAdded(ar.index, new MutableVariant(cb, cv));
}
removeNoflush(ar.index, 1);
} else {
throw new AccessorException("Cannot apply "+e.getClass().getName()+" to Array");
}
return rollback;
}
@Override
public void add(Binding binding, Object value) throws AccessorException {
assert b.isOpen();
writeLock();
try {
addNoflush(binding, value);
b.flush();
} catch (IOException e) {
throw new AccessorException( e );
} finally {
writeUnlock();
}
}
@Override
public void add(int index, Binding binding, Object value)
throws AccessorException {
assert b.isOpen();
writeLock();
try {
addNoflush(index, binding, value);
b.flush();
} catch (IOException e) {
throw new AccessorException( e );
} finally {
writeUnlock();
}
}
@Override
public void addAll(Binding binding, Object[] values)
throws AccessorException {
assert b.isOpen();
writeLock();
try {
addAllNoflush(binding, values);
b.flush();
} catch (IOException e) {
throw new AccessorException( e );
} finally {
writeUnlock();
}
}
@Override
public void addAll(int index, Binding binding, Object[] values)
throws AccessorException {
assert b.isOpen();
writeLock();
try {
addAllNoflush(index, binding, values);
b.flush();
} catch (IOException e) {
throw new AccessorException( e );
} finally {
writeUnlock();
}
}
@Override
public void remove(int index, int count) throws AccessorException {
assert b.isOpen();
writeLock();
try {
removeNoflush(index, count);
b.flush();
} catch (IOException e) {
throw new AccessorException( e );
} finally {
writeUnlock();
}
}
@Override
public void set(int index, Binding binding, Object value)
throws AccessorException {
assert b.isOpen();
writeLock();
try {
setNoflush(index, binding, value);
b.flush();
} catch (IOException e) {
throw new AccessorException(e);
} finally {
writeUnlock();
}
}
@Override
public void close() throws AccessorException {
if ( this.index instanceof CloseableAccessor ) {
CloseableAccessor ca = (CloseableAccessor) this.index;
ca.close();
}
super.close();
}
}