X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.databoard%2Fsrc%2Forg%2Fsimantics%2Fdataboard%2Faccessor%2Fwire%2FWireServer.java;h=a6490707c08c57520d6c7478521accc8d9572046;hp=8dd7fe27c1d02c15baabe6e8ba262bc97f75d7dd;hb=refs%2Fchanges%2F38%2F238%2F2;hpb=24e2b34260f219f0d1644ca7a138894980e25b14 diff --git a/bundles/org.simantics.databoard/src/org/simantics/databoard/accessor/wire/WireServer.java b/bundles/org.simantics.databoard/src/org/simantics/databoard/accessor/wire/WireServer.java index 8dd7fe27c..a6490707c 100644 --- a/bundles/org.simantics.databoard/src/org/simantics/databoard/accessor/wire/WireServer.java +++ b/bundles/org.simantics.databoard/src/org/simantics/databoard/accessor/wire/WireServer.java @@ -1,619 +1,619 @@ -/******************************************************************************* - * Copyright (c) 2010- Association for Decentralized Information Management in - * Industry THTH ry. - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * Contributors: - * VTT Technical Research Centre of Finland - initial API and implementation - *******************************************************************************/ -package org.simantics.databoard.accessor.wire; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -import org.simantics.databoard.Bindings; -import org.simantics.databoard.Methods; -import org.simantics.databoard.accessor.Accessor; -import org.simantics.databoard.accessor.Accessor.Listener; -import org.simantics.databoard.accessor.ArrayAccessor; -import org.simantics.databoard.accessor.MapAccessor; -import org.simantics.databoard.accessor.OptionalAccessor; -import org.simantics.databoard.accessor.RecordAccessor; -import org.simantics.databoard.accessor.UnionAccessor; -import org.simantics.databoard.accessor.error.AccessorConstructionException; -import org.simantics.databoard.accessor.error.AccessorException; -import org.simantics.databoard.accessor.event.Event; -import org.simantics.databoard.accessor.impl.AccessorParams; -import org.simantics.databoard.accessor.interestset.InterestSet; -import org.simantics.databoard.accessor.reference.ChildReference; -import org.simantics.databoard.adapter.AdaptException; -import org.simantics.databoard.annotations.Optional; -import org.simantics.databoard.binding.ArrayBinding; -import org.simantics.databoard.binding.Binding; -import org.simantics.databoard.binding.MapBinding; -import org.simantics.databoard.binding.RecordBinding; -import org.simantics.databoard.binding.UnionBinding; -import org.simantics.databoard.binding.error.BindingConstructionException; -import org.simantics.databoard.binding.impl.ObjectArrayBinding; -import org.simantics.databoard.binding.mutable.MutableVariant; -import org.simantics.databoard.method.MethodInterface; -import org.simantics.databoard.method.MethodInterface.Method; -import org.simantics.databoard.method.MethodNotSupportedException; -import org.simantics.databoard.method.MethodTypeBinding; -import org.simantics.databoard.method.TcpConnection; -import org.simantics.databoard.method.TcpConnection.ConnectionListener; -import org.simantics.databoard.type.ArrayType; -import org.simantics.databoard.type.Component; -import org.simantics.databoard.type.Datatype; -import org.simantics.databoard.type.MapType; -import org.simantics.databoard.type.UnionType; -import org.simantics.databoard.util.BijectionMap; - -/** - * WireServer exposes an accessor over TCP/IP socket. - * - * @author Toni Kalajainen - */ -public class WireServer implements IWireServer { - - Accessor accessor; - Map clients = Collections.synchronizedMap( new HashMap() ); - MethodInterface mi; - AccessorParams params = AccessorParams.DEFAULT; - - public WireServer(Accessor accessor) { - this.accessor = accessor; - try { - this.mi = Methods.bindInterface(IWireServer.class, this); - } catch (BindingConstructionException e) { - throw new RuntimeException(e); - } - } - - public MethodInterface getMethodInterface() { - return mi; - } - - static class ClientRecord { - TcpConnection connection; - MethodInterface clientMethodInterface; - Method onEventsMethod; - IWireClient clientMethods; - int accIdCounter = 0; - int listIdCounter = 0; - BijectionMap accessorTable = new BijectionMap(); - Map listenerTable = new HashMap(); - - public void dispose() { - } - } - - static class AccRecord { - int accId; - Accessor accessor; - ChildReference reference; - Datatype type; - Binding binding; - } - - static class LisRecord { - int lisId; - int accId; - InterestSet is; - Listener listener; - } - - static class OnEventsRequest { - public @Optional Integer arg1; - public @Optional Event[] arg2; - } - - /** - * Get or create new client handler associated with current thread. - * The result value depends on the current thread. - * - * @return client handler - * @throws WireException - */ - public ClientRecord getClient() throws WireException { - TcpConnection connection = TcpConnection.getCurrentConnection(); - if (connection == null) throw new WireException("Internal Error. This method must be invoked in ConnectionThread"); - try { - synchronized(clients) { - ClientRecord handler = clients.get( connection ); - if (handler == null) { - - RecordBinding requestBinding = (RecordBinding) Bindings.getBindingUnchecked( OnEventsRequest.class ); - Binding responseBinding = Bindings.INTEGER; - UnionType ut = new UnionType(); - ut.components = new Component[0]; - UnionBinding errorBinding = (UnionBinding) params.bindingScheme.getBinding( ut ); - MethodTypeBinding onEventsBinding = new MethodTypeBinding("onEvents", requestBinding, responseBinding, errorBinding); - - try { - handler = new ClientRecord(); - handler.clientMethodInterface = connection.getRemoteMethodInterface(); - handler.onEventsMethod = handler.clientMethodInterface.getMethod( onEventsBinding ); - handler.connection = connection; - handler.clientMethods = Methods.createProxy(IWireClient.class, handler.connection.getRemoteMethodInterface()); - } catch (MethodNotSupportedException e) { - throw new WireException(e); - } - - connection.addConnectionListener( new ConnectionListener() { - @Override - public void onError(Exception error) { - TcpConnection connection = TcpConnection.getCurrentConnection(); - ClientRecord handler = clients.remove(connection); - if (handler != null) handler.dispose(); - } - @Override - public void onClosed() { - TcpConnection connection = TcpConnection.getCurrentConnection(); - ClientRecord handler = clients.remove(connection); - if (handler != null) handler.dispose(); - }}); - clients.put(connection, handler); - } - return handler; - } - } catch (BindingConstructionException e) { - throw new WireException( e ); - } - } - - AccRecord getAccessor(int accId) throws WireException { - ClientRecord client = getClient(); - AccRecord acc = client.accessorTable.getRight(accId); - if (acc==null) throw new WireException("Invalid accessor id"); - return acc; - } - - LisRecord getListener(int accId, int lisId) throws WireException { - ClientRecord client = getClient(); - LisRecord lis = client.listenerTable.get(lisId); - return lis; - } - - // Assumption is that each method is invoked in a connection thread. - // Therefore object synchronization is unnecessary. - - @Override - public AccessorInfo openAccessor(ChildReference ref) throws WireException { - try { - ClientRecord client = getClient(); - AccRecord acc = new AccRecord(); - acc.accessor = accessor.getComponent(ref); - acc.accId = client.accIdCounter++; - acc.reference = ref; - acc.type = accessor.type(); - acc.binding = params.bindingScheme.getBinding(acc.type); - client.accessorTable.map(acc.accId, acc); - AccessorInfo ai = new AccessorInfo(); - ai.accessorId = acc.accId; - ai.type = acc.type; - return ai; - } catch (AccessorConstructionException e) { - throw new WireException( e ); - } catch (BindingConstructionException e) { - throw new WireException( e ); - } - } - - @Override - public int closeAccessors(Integer[] accIds) throws WireException { - ClientRecord client = getClient(); - for (Integer accId : accIds) { - client.accessorTable.removeWithLeft(accId); - } - return 0; - } - @Override - public MutableVariant getValue(int accId) throws WireException { - try { - AccRecord acc = getAccessor(accId); - Object value = acc.accessor.getValue(acc.binding); - return new MutableVariant(acc.binding, value); - } catch (AccessorException e) { - throw new WireException(e); - } - } - - @Override - public ApplyResult apply(int accId, Event[] changeSet, boolean rollback) { - ApplyResult result = new ApplyResult(); - result.rollbackLog = rollback ? new LinkedList() : null; - try { - AccRecord acc = getAccessor(accId); - List changeSetList = new ArrayList( changeSet.length ); - for (Event e : changeSet) changeSetList.add( e ); - acc.accessor.apply(changeSetList, result.rollbackLog); - } catch (WireException e) { - result.error = e; - } catch (AccessorException e) { - result.error = new WireException(e); - } - return result; - } - - @Override - public int addListener(int accId, InterestSet interestSet, - ChildReference path) throws WireException { - try { - - ClientRecord client = getClient(); - AccRecord acc = getAccessor(accId); - final LisRecord lis = new LisRecord(); - lis.accId = accId; - lis.is = interestSet; - lis.lisId = client.listIdCounter++; - lis.listener = new Listener() { - @Override - public void onEvents(Collection events) { - try { - // Push event to client - ClientRecord client = getClient(); - Event[] eventArray = events.toArray( new Event[events.size()] ); - -// client.clientMethods.onEvents(lis.lisId, eventArray); - - // Invoke async. - OnEventsRequest req = new OnEventsRequest(); - req.arg1 = lis.lisId; - req.arg2 = eventArray; - client.onEventsMethod.invoke( req ); - } catch (WireException e) { - } - }}; - acc.accessor.addListener(lis.listener, interestSet, path, null /*TODO Dispatch thread*/); - client.listenerTable.put(lis.lisId, lis); - return lis.lisId; - } catch (AccessorException e) { - throw new WireException(e.getClass().getName()+": "+e.getMessage()); - } - } - - @Override - public int removeListener(int lisId) throws WireException { - try { - ClientRecord client = getClient(); - LisRecord lis = client.listenerTable.remove(lisId); - AccRecord acc = getAccessor(lis.accId); - acc.accessor.removeListener(lis.listener); - return 0; - } catch (AccessorException e) { - throw new WireException( e ); - } - } - - @Override - public int size(int accId) throws WireException { - try { - Accessor acc = getAccessor(accId).accessor; - if (acc instanceof ArrayAccessor) return ((ArrayAccessor)acc).size(); - if (acc instanceof RecordAccessor) return ((RecordAccessor)acc).count(); - if (acc instanceof MapAccessor) return ((MapAccessor)acc).size(); - if (acc instanceof UnionAccessor) return ((UnionAccessor)acc).count(); - throw new WireException("Cannot get size for "+acc.getClass().getName()); - } catch (AccessorException e) { - throw new WireException( e ); - } - } - - @Override - public int clear(int accId) throws WireException { - try { - Accessor acc = getAccessor(accId).accessor; - if (acc instanceof ArrayAccessor) { - ArrayAccessor aa = ((ArrayAccessor)acc); - aa.remove(0, aa.size()); - return 0; - } - if (acc instanceof MapAccessor) { - ((MapAccessor)acc).clear(); - return 0; - } - throw new WireException("Cannot clear "+acc.getClass().getName()); - } catch (AccessorException e) { - throw new WireException( e ); - } - } -/* - @Override - public void add(int accId, int index, Variant[] values) throws WireException { - try { - Accessor acc = getAccessor(accId).accessor; - if (acc instanceof ArrayAccessor) { - ArrayAccessor aa = (ArrayAccessor) acc; - if (values.length==0) return; - - Binding b = values[0].getBinding(); - boolean sameBinding = true; - for (Variant v : values) sameBinding &= v.getBinding()==b; - - if (sameBinding) { - Object[] objs = new Object[ values.length ]; - for (int i=0; i + */ +public class WireServer implements IWireServer { + + Accessor accessor; + Map clients = Collections.synchronizedMap( new HashMap() ); + MethodInterface mi; + AccessorParams params = AccessorParams.DEFAULT; + + public WireServer(Accessor accessor) { + this.accessor = accessor; + try { + this.mi = Methods.bindInterface(IWireServer.class, this); + } catch (BindingConstructionException e) { + throw new RuntimeException(e); + } + } + + public MethodInterface getMethodInterface() { + return mi; + } + + static class ClientRecord { + TcpConnection connection; + MethodInterface clientMethodInterface; + Method onEventsMethod; + IWireClient clientMethods; + int accIdCounter = 0; + int listIdCounter = 0; + BijectionMap accessorTable = new BijectionMap(); + Map listenerTable = new HashMap(); + + public void dispose() { + } + } + + static class AccRecord { + int accId; + Accessor accessor; + ChildReference reference; + Datatype type; + Binding binding; + } + + static class LisRecord { + int lisId; + int accId; + InterestSet is; + Listener listener; + } + + static class OnEventsRequest { + public @Optional Integer arg1; + public @Optional Event[] arg2; + } + + /** + * Get or create new client handler associated with current thread. + * The result value depends on the current thread. + * + * @return client handler + * @throws WireException + */ + public ClientRecord getClient() throws WireException { + TcpConnection connection = TcpConnection.getCurrentConnection(); + if (connection == null) throw new WireException("Internal Error. This method must be invoked in ConnectionThread"); + try { + synchronized(clients) { + ClientRecord handler = clients.get( connection ); + if (handler == null) { + + RecordBinding requestBinding = (RecordBinding) Bindings.getBindingUnchecked( OnEventsRequest.class ); + Binding responseBinding = Bindings.INTEGER; + UnionType ut = new UnionType(); + ut.components = new Component[0]; + UnionBinding errorBinding = (UnionBinding) params.bindingScheme.getBinding( ut ); + MethodTypeBinding onEventsBinding = new MethodTypeBinding("onEvents", requestBinding, responseBinding, errorBinding); + + try { + handler = new ClientRecord(); + handler.clientMethodInterface = connection.getRemoteMethodInterface(); + handler.onEventsMethod = handler.clientMethodInterface.getMethod( onEventsBinding ); + handler.connection = connection; + handler.clientMethods = Methods.createProxy(IWireClient.class, handler.connection.getRemoteMethodInterface()); + } catch (MethodNotSupportedException e) { + throw new WireException(e); + } + + connection.addConnectionListener( new ConnectionListener() { + @Override + public void onError(Exception error) { + TcpConnection connection = TcpConnection.getCurrentConnection(); + ClientRecord handler = clients.remove(connection); + if (handler != null) handler.dispose(); + } + @Override + public void onClosed() { + TcpConnection connection = TcpConnection.getCurrentConnection(); + ClientRecord handler = clients.remove(connection); + if (handler != null) handler.dispose(); + }}); + clients.put(connection, handler); + } + return handler; + } + } catch (BindingConstructionException e) { + throw new WireException( e ); + } + } + + AccRecord getAccessor(int accId) throws WireException { + ClientRecord client = getClient(); + AccRecord acc = client.accessorTable.getRight(accId); + if (acc==null) throw new WireException("Invalid accessor id"); + return acc; + } + + LisRecord getListener(int accId, int lisId) throws WireException { + ClientRecord client = getClient(); + LisRecord lis = client.listenerTable.get(lisId); + return lis; + } + + // Assumption is that each method is invoked in a connection thread. + // Therefore object synchronization is unnecessary. + + @Override + public AccessorInfo openAccessor(ChildReference ref) throws WireException { + try { + ClientRecord client = getClient(); + AccRecord acc = new AccRecord(); + acc.accessor = accessor.getComponent(ref); + acc.accId = client.accIdCounter++; + acc.reference = ref; + acc.type = accessor.type(); + acc.binding = params.bindingScheme.getBinding(acc.type); + client.accessorTable.map(acc.accId, acc); + AccessorInfo ai = new AccessorInfo(); + ai.accessorId = acc.accId; + ai.type = acc.type; + return ai; + } catch (AccessorConstructionException e) { + throw new WireException( e ); + } catch (BindingConstructionException e) { + throw new WireException( e ); + } + } + + @Override + public int closeAccessors(Integer[] accIds) throws WireException { + ClientRecord client = getClient(); + for (Integer accId : accIds) { + client.accessorTable.removeWithLeft(accId); + } + return 0; + } + @Override + public MutableVariant getValue(int accId) throws WireException { + try { + AccRecord acc = getAccessor(accId); + Object value = acc.accessor.getValue(acc.binding); + return new MutableVariant(acc.binding, value); + } catch (AccessorException e) { + throw new WireException(e); + } + } + + @Override + public ApplyResult apply(int accId, Event[] changeSet, boolean rollback) { + ApplyResult result = new ApplyResult(); + result.rollbackLog = rollback ? new LinkedList() : null; + try { + AccRecord acc = getAccessor(accId); + List changeSetList = new ArrayList( changeSet.length ); + for (Event e : changeSet) changeSetList.add( e ); + acc.accessor.apply(changeSetList, result.rollbackLog); + } catch (WireException e) { + result.error = e; + } catch (AccessorException e) { + result.error = new WireException(e); + } + return result; + } + + @Override + public int addListener(int accId, InterestSet interestSet, + ChildReference path) throws WireException { + try { + + ClientRecord client = getClient(); + AccRecord acc = getAccessor(accId); + final LisRecord lis = new LisRecord(); + lis.accId = accId; + lis.is = interestSet; + lis.lisId = client.listIdCounter++; + lis.listener = new Listener() { + @Override + public void onEvents(Collection events) { + try { + // Push event to client + ClientRecord client = getClient(); + Event[] eventArray = events.toArray( new Event[events.size()] ); + +// client.clientMethods.onEvents(lis.lisId, eventArray); + + // Invoke async. + OnEventsRequest req = new OnEventsRequest(); + req.arg1 = lis.lisId; + req.arg2 = eventArray; + client.onEventsMethod.invoke( req ); + } catch (WireException e) { + } + }}; + acc.accessor.addListener(lis.listener, interestSet, path, null /*TODO Dispatch thread*/); + client.listenerTable.put(lis.lisId, lis); + return lis.lisId; + } catch (AccessorException e) { + throw new WireException(e.getClass().getName()+": "+e.getMessage()); + } + } + + @Override + public int removeListener(int lisId) throws WireException { + try { + ClientRecord client = getClient(); + LisRecord lis = client.listenerTable.remove(lisId); + AccRecord acc = getAccessor(lis.accId); + acc.accessor.removeListener(lis.listener); + return 0; + } catch (AccessorException e) { + throw new WireException( e ); + } + } + + @Override + public int size(int accId) throws WireException { + try { + Accessor acc = getAccessor(accId).accessor; + if (acc instanceof ArrayAccessor) return ((ArrayAccessor)acc).size(); + if (acc instanceof RecordAccessor) return ((RecordAccessor)acc).count(); + if (acc instanceof MapAccessor) return ((MapAccessor)acc).size(); + if (acc instanceof UnionAccessor) return ((UnionAccessor)acc).count(); + throw new WireException("Cannot get size for "+acc.getClass().getName()); + } catch (AccessorException e) { + throw new WireException( e ); + } + } + + @Override + public int clear(int accId) throws WireException { + try { + Accessor acc = getAccessor(accId).accessor; + if (acc instanceof ArrayAccessor) { + ArrayAccessor aa = ((ArrayAccessor)acc); + aa.remove(0, aa.size()); + return 0; + } + if (acc instanceof MapAccessor) { + ((MapAccessor)acc).clear(); + return 0; + } + throw new WireException("Cannot clear "+acc.getClass().getName()); + } catch (AccessorException e) { + throw new WireException( e ); + } + } +/* + @Override + public void add(int accId, int index, Variant[] values) throws WireException { + try { + Accessor acc = getAccessor(accId).accessor; + if (acc instanceof ArrayAccessor) { + ArrayAccessor aa = (ArrayAccessor) acc; + if (values.length==0) return; + + Binding b = values[0].getBinding(); + boolean sameBinding = true; + for (Variant v : values) sameBinding &= v.getBinding()==b; + + if (sameBinding) { + Object[] objs = new Object[ values.length ]; + for (int i=0; i