/******************************************************************************* * Copyright (c) 2010- Association for Decentralized Information Management in * Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation *******************************************************************************/ package org.simantics.databoard.accessor.wire; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import org.simantics.databoard.Bindings; import org.simantics.databoard.Methods; import org.simantics.databoard.accessor.Accessor; import org.simantics.databoard.accessor.Accessor.Listener; import org.simantics.databoard.accessor.ArrayAccessor; import org.simantics.databoard.accessor.MapAccessor; import org.simantics.databoard.accessor.OptionalAccessor; import org.simantics.databoard.accessor.RecordAccessor; import org.simantics.databoard.accessor.UnionAccessor; import org.simantics.databoard.accessor.error.AccessorConstructionException; import org.simantics.databoard.accessor.error.AccessorException; import org.simantics.databoard.accessor.event.Event; import org.simantics.databoard.accessor.impl.AccessorParams; import org.simantics.databoard.accessor.interestset.InterestSet; import org.simantics.databoard.accessor.reference.ChildReference; import org.simantics.databoard.adapter.AdaptException; import org.simantics.databoard.annotations.Optional; import org.simantics.databoard.binding.ArrayBinding; import org.simantics.databoard.binding.Binding; import org.simantics.databoard.binding.MapBinding; import org.simantics.databoard.binding.RecordBinding; import org.simantics.databoard.binding.UnionBinding; import org.simantics.databoard.binding.error.BindingConstructionException; import org.simantics.databoard.binding.impl.ObjectArrayBinding; import org.simantics.databoard.binding.mutable.MutableVariant; import org.simantics.databoard.method.MethodInterface; import org.simantics.databoard.method.MethodInterface.Method; import org.simantics.databoard.method.MethodNotSupportedException; import org.simantics.databoard.method.MethodTypeBinding; import org.simantics.databoard.method.TcpConnection; import org.simantics.databoard.method.TcpConnection.ConnectionListener; import org.simantics.databoard.type.ArrayType; import org.simantics.databoard.type.Component; import org.simantics.databoard.type.Datatype; import org.simantics.databoard.type.MapType; import org.simantics.databoard.type.UnionType; import org.simantics.databoard.util.BijectionMap; /** * WireServer exposes an accessor over TCP/IP socket. * * @author Toni Kalajainen */ public class WireServer implements IWireServer { Accessor accessor; Map clients = Collections.synchronizedMap( new HashMap() ); MethodInterface mi; AccessorParams params = AccessorParams.DEFAULT; public WireServer(Accessor accessor) { this.accessor = accessor; try { this.mi = Methods.bindInterface(IWireServer.class, this); } catch (BindingConstructionException e) { throw new RuntimeException(e); } } public MethodInterface getMethodInterface() { return mi; } static class ClientRecord { TcpConnection connection; MethodInterface clientMethodInterface; Method onEventsMethod; IWireClient clientMethods; int accIdCounter = 0; int listIdCounter = 0; BijectionMap accessorTable = new BijectionMap(); Map listenerTable = new HashMap(); public void dispose() { } } static class AccRecord { int accId; Accessor accessor; ChildReference reference; Datatype type; Binding binding; } static class LisRecord { int lisId; int accId; InterestSet is; Listener listener; } static class OnEventsRequest { public @Optional Integer arg1; public @Optional Event[] arg2; } /** * Get or create new client handler associated with current thread. * The result value depends on the current thread. * * @return client handler * @throws WireException */ public ClientRecord getClient() throws WireException { TcpConnection connection = TcpConnection.getCurrentConnection(); if (connection == null) throw new WireException("Internal Error. This method must be invoked in ConnectionThread"); try { synchronized(clients) { ClientRecord handler = clients.get( connection ); if (handler == null) { RecordBinding requestBinding = (RecordBinding) Bindings.getBindingUnchecked( OnEventsRequest.class ); Binding responseBinding = Bindings.INTEGER; UnionType ut = new UnionType(); ut.components = new Component[0]; UnionBinding errorBinding = (UnionBinding) params.bindingScheme.getBinding( ut ); MethodTypeBinding onEventsBinding = new MethodTypeBinding("onEvents", requestBinding, responseBinding, errorBinding); try { handler = new ClientRecord(); handler.clientMethodInterface = connection.getRemoteMethodInterface(); handler.onEventsMethod = handler.clientMethodInterface.getMethod( onEventsBinding ); handler.connection = connection; handler.clientMethods = Methods.createProxy(IWireClient.class, handler.connection.getRemoteMethodInterface()); } catch (MethodNotSupportedException e) { throw new WireException(e); } connection.addConnectionListener( new ConnectionListener() { @Override public void onError(Exception error) { TcpConnection connection = TcpConnection.getCurrentConnection(); ClientRecord handler = clients.remove(connection); if (handler != null) handler.dispose(); } @Override public void onClosed() { TcpConnection connection = TcpConnection.getCurrentConnection(); ClientRecord handler = clients.remove(connection); if (handler != null) handler.dispose(); }}); clients.put(connection, handler); } return handler; } } catch (BindingConstructionException e) { throw new WireException( e ); } } AccRecord getAccessor(int accId) throws WireException { ClientRecord client = getClient(); AccRecord acc = client.accessorTable.getRight(accId); if (acc==null) throw new WireException("Invalid accessor id"); return acc; } LisRecord getListener(int accId, int lisId) throws WireException { ClientRecord client = getClient(); LisRecord lis = client.listenerTable.get(lisId); return lis; } // Assumption is that each method is invoked in a connection thread. // Therefore object synchronization is unnecessary. @Override public AccessorInfo openAccessor(ChildReference ref) throws WireException { try { ClientRecord client = getClient(); AccRecord acc = new AccRecord(); acc.accessor = accessor.getComponent(ref); acc.accId = client.accIdCounter++; acc.reference = ref; acc.type = accessor.type(); acc.binding = params.bindingScheme.getBinding(acc.type); client.accessorTable.map(acc.accId, acc); AccessorInfo ai = new AccessorInfo(); ai.accessorId = acc.accId; ai.type = acc.type; return ai; } catch (AccessorConstructionException e) { throw new WireException( e ); } catch (BindingConstructionException e) { throw new WireException( e ); } } @Override public int closeAccessors(Integer[] accIds) throws WireException { ClientRecord client = getClient(); for (Integer accId : accIds) { client.accessorTable.removeWithLeft(accId); } return 0; } @Override public MutableVariant getValue(int accId) throws WireException { try { AccRecord acc = getAccessor(accId); Object value = acc.accessor.getValue(acc.binding); return new MutableVariant(acc.binding, value); } catch (AccessorException e) { throw new WireException(e); } } @Override public ApplyResult apply(int accId, Event[] changeSet, boolean rollback) { ApplyResult result = new ApplyResult(); result.rollbackLog = rollback ? new LinkedList() : null; try { AccRecord acc = getAccessor(accId); List changeSetList = new ArrayList( changeSet.length ); for (Event e : changeSet) changeSetList.add( e ); acc.accessor.apply(changeSetList, result.rollbackLog); } catch (WireException e) { result.error = e; } catch (AccessorException e) { result.error = new WireException(e); } return result; } @Override public int addListener(int accId, InterestSet interestSet, ChildReference path) throws WireException { try { ClientRecord client = getClient(); AccRecord acc = getAccessor(accId); final LisRecord lis = new LisRecord(); lis.accId = accId; lis.is = interestSet; lis.lisId = client.listIdCounter++; lis.listener = new Listener() { @Override public void onEvents(Collection events) { try { // Push event to client ClientRecord client = getClient(); Event[] eventArray = events.toArray( new Event[events.size()] ); // client.clientMethods.onEvents(lis.lisId, eventArray); // Invoke async. OnEventsRequest req = new OnEventsRequest(); req.arg1 = lis.lisId; req.arg2 = eventArray; client.onEventsMethod.invoke( req ); } catch (WireException e) { } }}; acc.accessor.addListener(lis.listener, interestSet, path, null /*TODO Dispatch thread*/); client.listenerTable.put(lis.lisId, lis); return lis.lisId; } catch (AccessorException e) { throw new WireException(e.getClass().getName()+": "+e.getMessage()); } } @Override public int removeListener(int lisId) throws WireException { try { ClientRecord client = getClient(); LisRecord lis = client.listenerTable.remove(lisId); AccRecord acc = getAccessor(lis.accId); acc.accessor.removeListener(lis.listener); return 0; } catch (AccessorException e) { throw new WireException( e ); } } @Override public int size(int accId) throws WireException { try { Accessor acc = getAccessor(accId).accessor; if (acc instanceof ArrayAccessor) return ((ArrayAccessor)acc).size(); if (acc instanceof RecordAccessor) return ((RecordAccessor)acc).count(); if (acc instanceof MapAccessor) return ((MapAccessor)acc).size(); if (acc instanceof UnionAccessor) return ((UnionAccessor)acc).count(); throw new WireException("Cannot get size for "+acc.getClass().getName()); } catch (AccessorException e) { throw new WireException( e ); } } @Override public int clear(int accId) throws WireException { try { Accessor acc = getAccessor(accId).accessor; if (acc instanceof ArrayAccessor) { ArrayAccessor aa = ((ArrayAccessor)acc); aa.remove(0, aa.size()); return 0; } if (acc instanceof MapAccessor) { ((MapAccessor)acc).clear(); return 0; } throw new WireException("Cannot clear "+acc.getClass().getName()); } catch (AccessorException e) { throw new WireException( e ); } } /* @Override public void add(int accId, int index, Variant[] values) throws WireException { try { Accessor acc = getAccessor(accId).accessor; if (acc instanceof ArrayAccessor) { ArrayAccessor aa = (ArrayAccessor) acc; if (values.length==0) return; Binding b = values[0].getBinding(); boolean sameBinding = true; for (Variant v : values) sameBinding &= v.getBinding()==b; if (sameBinding) { Object[] objs = new Object[ values.length ]; for (int i=0; i