]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.acorn/src/org/simantics/acorn/GraphClientImpl2.java
Merge "Remove unused import in DeleteHandler"
[simantics/platform.git] / bundles / org.simantics.acorn / src / org / simantics / acorn / GraphClientImpl2.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package org.simantics.acorn;
13
14 import java.io.IOException;
15 import java.nio.file.Path;
16 import java.util.ArrayList;
17 import java.util.LinkedList;
18 import java.util.concurrent.ExecutorService;
19 import java.util.concurrent.Executors;
20 import java.util.concurrent.Semaphore;
21 import java.util.concurrent.ThreadFactory;
22 import java.util.concurrent.TimeUnit;
23
24 import org.simantics.acorn.exception.AcornAccessVerificationException;
25 import org.simantics.acorn.exception.IllegalAcornStateException;
26 import org.simantics.acorn.internal.ClusterChange;
27 import org.simantics.acorn.internal.ClusterUpdateProcessorBase;
28 import org.simantics.acorn.internal.UndoClusterUpdateProcessor;
29 import org.simantics.acorn.lru.ClusterInfo;
30 import org.simantics.acorn.lru.ClusterStreamChunk;
31 import org.simantics.acorn.lru.ClusterUpdateOperation;
32 import org.simantics.acorn.lru.ClusterChangeSet.Entry;
33 import org.simantics.db.ClusterCreator;
34 import org.simantics.db.Database;
35 import org.simantics.db.ServiceLocator;
36 import org.simantics.db.common.utils.Logger;
37 import org.simantics.db.exception.DatabaseException;
38 import org.simantics.db.exception.SDBException;
39 import org.simantics.db.server.ProCoreException;
40 import org.simantics.db.service.ClusterSetsSupport;
41 import org.simantics.db.service.ClusterUID;
42 import org.simantics.db.service.LifecycleSupport;
43 import org.simantics.utils.datastructures.Pair;
44 import org.simantics.utils.logging.TimeLogger;
45
46 import gnu.trove.map.hash.TLongObjectHashMap;
47
48 public class GraphClientImpl2 implements Database.Session {
49         
50         public static final boolean DEBUG = false;
51
52         public final ClusterManager clusters;
53         
54         private TransactionManager transactionManager = new TransactionManager();
55         private ExecutorService executor = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Main Program", false));
56         private ExecutorService saver = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Snapshot Saver", true));
57
58         private Path dbFolder;
59         private final Database database;
60         private ServiceLocator locator;
61         private MainProgram mainProgram;
62
63         static class ClientThreadFactory implements ThreadFactory {
64                 
65                 final String name;
66                 final boolean daemon;
67                 
68                 public ClientThreadFactory(String name, boolean daemon) {
69                         this.name = name;
70                         this.daemon = daemon;
71                 }
72                 
73                 @Override
74                 public Thread newThread(Runnable r) {
75                         Thread thread = new Thread(r, name);
76                         thread.setDaemon(daemon);
77                         return thread;
78                 }
79         }
80
81         public GraphClientImpl2(Database database, Path dbFolder, ServiceLocator locator) throws IOException {
82             this.database = database;
83             this.dbFolder = dbFolder;
84             this.locator = locator;
85             this.clusters = new ClusterManager(dbFolder);
86             load();
87             ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class); 
88             cssi.setReadDirectory(clusters.lastSessionDirectory);
89             mainProgram = new MainProgram(this, clusters);
90             executor.execute(mainProgram);
91         }
92
93         public Path getDbFolder() {
94             return dbFolder;
95         }
96
97         public void tryMakeSnapshot() throws IOException {
98                 
99             if (isClosing || unexpectedClose)
100                 return;
101             
102                 saver.execute(new Runnable() {
103
104                         @Override
105                         public void run() {
106                                 Transaction tr = null;
107                                 try {
108                                         // First take a write transaction
109                                         tr = askWriteTransaction(-1);
110                                         // Then make sure that MainProgram is idling
111                                         mainProgram.mutex.acquire();
112                                         try {
113                                                 synchronized(mainProgram) {
114                                                         if(mainProgram.operations.isEmpty()) {
115                                                                 makeSnapshot(false);
116                                                         } else {
117                                                                 // MainProgram is becoming busy again - delay snapshotting
118                                                                 return;
119                                                         }
120                                                 }
121                                         } finally {
122                                                 mainProgram.mutex.release();
123                                         }
124                                 } catch (IllegalAcornStateException | ProCoreException e) {
125                                         Logger.defaultLogError("Snapshotting failed", e);
126                                         unexpectedClose = true;
127                                 } catch (InterruptedException e) {
128                                         Logger.defaultLogError("Snapshotting interrupted", e);
129                                 } finally {
130                                         try {
131                                                 if(tr != null)
132                                                         endTransaction(tr.getTransactionId());
133                                                 if (unexpectedClose) {
134                                                 LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class);
135                             try {
136                                 support.close();
137                             } catch (DatabaseException e1) {
138                                 Logger.defaultLogError("Failed to close database as a safety measure due to failed snapshotting", e1);
139                             }
140                                                 }
141                                         } catch (ProCoreException e) {
142                                                 Logger.defaultLogError("Failed to end snapshotting write transaction", e);
143                                         }
144                                 }
145                         }
146                 });
147         }
148         
149     public void makeSnapshot(boolean fullSave) throws IllegalAcornStateException {
150         clusters.makeSnapshot(locator, fullSave);
151     }
152         
153         public <T> T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException {
154             try {
155             return clusters.clone(uid, creator);
156         } catch (AcornAccessVerificationException | IllegalAcornStateException | IOException e) {
157             unexpectedClose = true;
158             throw new DatabaseException(e);
159         }
160         }
161
162 //      private void save() throws IOException {
163 //              clusters.save();
164 //      }
165         
166         public void load() throws IOException {
167                 clusters.load();
168         }
169         
170 //      public void modiFileEx(ClusterUID uid, int resourceKey, long offset, long size, byte[] bytes, long pos, ClusterSupport support) {
171 //              clusters.modiFileEx(uid, resourceKey, offset, size, bytes, pos, support);
172 //      }
173
174         @Override
175         public Database getDatabase() {
176                 return database;
177         }
178
179         private boolean closed = false;
180         private boolean isClosing = false;
181         private boolean unexpectedClose = false;
182         
183         @Override
184         public void close() throws ProCoreException {
185             System.err.println("Closing " + this + " and mainProgram " + mainProgram);
186                 if(!closed && !isClosing) {
187                     isClosing = true;
188                         try {
189                             if (!unexpectedClose)
190                                 makeSnapshot(true);
191                                 
192                                 mainProgram.close();
193                                 clusters.shutdown();
194                                 executor.shutdown();
195                                 saver.shutdown();
196                                 boolean executorTerminated = executor.awaitTermination(500, TimeUnit.MILLISECONDS);
197                                 boolean saverTerminated = saver.awaitTermination(500, TimeUnit.MILLISECONDS);
198                                 
199                                 System.err.println("executorTerminated=" + executorTerminated + ", saverTerminated=" + saverTerminated);
200                                 
201                                 mainProgram = null;
202                                 executor = null;
203                                 saver = null;
204                                 
205                         } catch (IllegalAcornStateException | InterruptedException e) {
206                                 throw new ProCoreException(e);
207                         }
208                         closed = true;
209                 }
210                 //impl.close();
211         }
212
213         @Override
214         public void open() throws ProCoreException {
215                 throw new UnsupportedOperationException();
216         }
217
218         @Override
219         public boolean isClosed() throws ProCoreException {
220                 return closed;
221         }
222         
223         @Override
224         public void acceptCommit(long transactionId, long changeSetId, byte[] metadata) throws ProCoreException {
225                 clusters.state.headChangeSetId++;
226                 long committedChangeSetId = changeSetId + 1;
227                 try {
228                 clusters.commitChangeSet(committedChangeSetId, metadata);
229                 
230                 clusters.state.transactionId = transactionId;
231                 
232                 mainProgram.committed();
233                 
234                 TimeLogger.log("Accepted commit");
235                 } catch (IllegalAcornStateException e) {
236                     throw new ProCoreException(e);
237                 }
238         }
239
240         @Override
241         public long cancelCommit(long transactionId, long changeSetId, byte[] metadata, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException {
242                 // Accept and finalize current transaction and then undo it
243                 acceptCommit(transactionId, changeSetId, metadata);
244
245                 try {
246                         undo(new long[] {changeSetId+1}, onChangeSetUpdate);
247                         clusters.state.headChangeSetId++;
248                         return clusters.state.headChangeSetId;
249                 } catch (SDBException e) {
250                         Logger.defaultLogError("Failed to undo cancelled transaction", e);
251                         throw new ProCoreException(e);
252                 }
253         }
254
255         @Override
256         public Transaction askReadTransaction() throws ProCoreException {
257                 return transactionManager.askReadTransaction();
258         }
259
260         enum TransactionState {
261                 IDLE,WRITE,READ
262         }
263         
264         class TransactionRequest {
265                 public TransactionState state;
266                 public Semaphore semaphore;
267                 public TransactionRequest(TransactionState state, Semaphore semaphore) {
268                         this.state = state;
269                         this.semaphore = semaphore;
270                 }
271         }
272
273         class TransactionManager {
274
275                 private TransactionState currentTransactionState = TransactionState.IDLE;
276                 
277                 private int reads = 0;
278                 
279                 LinkedList<TransactionRequest> requests = new LinkedList<TransactionRequest>();
280                 
281                 TLongObjectHashMap<TransactionRequest> requestMap = new TLongObjectHashMap<TransactionRequest>();
282                 
283                 private synchronized Transaction makeTransaction(TransactionRequest req) {
284                         
285                         final int csId = clusters.state.headChangeSetId;
286                         final long trId = clusters.state.transactionId+1;
287                         requestMap.put(trId, req);
288                         return new Transaction() {
289                                 
290                                 @Override
291                                 public long getTransactionId() {
292                                         return trId;
293                                 }
294                                 
295                                 @Override
296                                 public long getHeadChangeSetId() {
297                                         return csId;
298                                 }
299                         };
300                 }
301                 
302                 /*
303                  * This method cannot be synchronized since it waits and must support multiple entries
304                  * by query thread(s) and internal transactions such as snapshot saver
305                  */
306                 public Transaction askReadTransaction() throws ProCoreException {
307                 
308                         Semaphore semaphore = new Semaphore(0);
309                         
310                         TransactionRequest req = queue(TransactionState.READ, semaphore);
311                         
312                         try {
313                                 semaphore.acquire();
314                         } catch (InterruptedException e) {
315                                 throw new ProCoreException(e);
316                         }
317                         
318                         return makeTransaction(req);
319
320                 }
321
322                 private synchronized void dispatch() {
323                         TransactionRequest r = requests.removeFirst();
324                         if(r.state == TransactionState.READ) reads++;
325                         r.semaphore.release();
326                 }
327                 
328                 private synchronized void processRequests() {
329                         
330                         while(true) {
331
332                                 if(requests.isEmpty()) return;
333                                 TransactionRequest req = requests.peek();
334
335                                 if(currentTransactionState == TransactionState.IDLE) {
336                                 
337                                         // Accept anything while IDLE
338                                         currentTransactionState = req.state;
339                                         dispatch();
340                                         
341                                 } else if (currentTransactionState == TransactionState.READ) {
342                                         
343                                         if(req.state == currentTransactionState) {
344
345                                                 // Allow other reads
346                                                 dispatch();
347
348                                         } else {
349                                                 
350                                                 // Wait
351                                                 return;
352                                                 
353                                         }
354                                         
355                                 }  else if (currentTransactionState == TransactionState.WRITE) {
356
357                                         // Wait
358                                         return;
359                                         
360                                 }
361                                 
362                         }
363                         
364                 }
365                 
366                 private synchronized TransactionRequest queue(TransactionState state, Semaphore semaphore) {
367                         TransactionRequest req = new TransactionRequest(state, semaphore); 
368                         requests.addLast(req);
369                         processRequests();
370                         return req;
371                 }
372                 
373                 /*
374                  * This method cannot be synchronized since it waits and must support multiple entries
375                  * by query thread(s) and internal transactions such as snapshot saver
376                  */
377                 public Transaction askWriteTransaction() throws IllegalAcornStateException {
378                         
379                         Semaphore semaphore = new Semaphore(0);
380                         TransactionRequest req = queue(TransactionState.WRITE, semaphore);
381                         
382                         try {
383                                 semaphore.acquire();
384                         } catch (InterruptedException e) {
385                                 throw new IllegalAcornStateException(e);
386                         }
387                         mainProgram.startTransaction(clusters.state.headChangeSetId+1);
388                         return makeTransaction(req);
389                 }
390                 
391                 public synchronized long endTransaction(long transactionId) throws ProCoreException {
392                         
393                         TransactionRequest req = requestMap.remove(transactionId);
394                         if(req.state == TransactionState.WRITE) {
395                                 currentTransactionState = TransactionState.IDLE;
396                                 processRequests();
397                         } else {
398                                 reads--;
399                                 if(reads == 0) {
400                                         currentTransactionState = TransactionState.IDLE;
401                                         processRequests();
402                                 }
403                         }
404                         return clusters.state.transactionId;
405                 }
406
407         }
408         
409         @Override
410         public Transaction askWriteTransaction(final long transactionId) throws ProCoreException {
411                 try {
412                     if (isClosing || unexpectedClose || closed) {
413                         throw new ProCoreException("GraphClientImpl2 is already closing so no more write transactions allowed!");
414                     }
415             return transactionManager.askWriteTransaction();
416         } catch (IllegalAcornStateException e) {
417             throw new ProCoreException(e);
418         }
419         }
420
421         @Override
422         public long endTransaction(long transactionId) throws ProCoreException {
423                 return transactionManager.endTransaction(transactionId);
424         }
425
426         @Override
427         public String execute(String command) throws ProCoreException {
428                 // This is called only by WriteGraphImpl.commitAccessorChanges
429                 // We can ignore this in Acorn
430                 return "";
431         }
432
433         @Override
434         public byte[] getChangeSetMetadata(long changeSetId) throws ProCoreException {
435                 try {
436             return clusters.getMetadata(changeSetId);
437         } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
438             throw new ProCoreException(e);
439         }
440         }
441
442         @Override
443         public ChangeSetData getChangeSetData(long minChangeSetId,
444                         long maxChangeSetId, OnChangeSetUpdate onChangeSetupate)
445                         throws ProCoreException {
446                 
447                 new Exception("GetChangeSetDataFunction " + minChangeSetId + " " + maxChangeSetId).printStackTrace();;
448                 return null;
449                 
450         }
451
452         @Override
453         public ChangeSetIds getChangeSetIds() throws ProCoreException {
454                 throw new UnsupportedOperationException();
455         }
456
457         @Override
458         public Cluster getCluster(byte[] clusterId) throws ProCoreException {
459                 throw new UnsupportedOperationException();
460         }
461
462         @Override
463         public ClusterChanges getClusterChanges(long changeSetId, byte[] clusterId)
464                         throws ProCoreException {
465                 throw new UnsupportedOperationException();
466         }
467
468         @Override
469         public ClusterIds getClusterIds() throws ProCoreException {
470                 try {
471             return clusters.getClusterIds();
472         } catch (IllegalAcornStateException e) {
473             throw new ProCoreException(e);
474         }
475         }
476
477         @Override
478         public Information getInformation() throws ProCoreException {
479                 return new Information() {
480
481                         @Override
482                         public String getServerId() {
483                                 return "server";
484                         }
485
486                         @Override
487                         public String getProtocolId() {
488                                 return "";
489                         }
490
491                         @Override
492                         public String getDatabaseId() {
493                                 return "database";
494                         }
495
496                         @Override
497                         public long getFirstChangeSetId() {
498                                 return 0;
499                         }
500                         
501                 };
502         }
503
504         @Override
505         public Refresh getRefresh(long changeSetId) throws ProCoreException {
506                 
507                 final ClusterIds ids = getClusterIds();
508                 
509                 return new Refresh() {
510
511                         @Override
512                         public long getHeadChangeSetId() {
513                                 return clusters.state.headChangeSetId;
514                         }
515
516                         @Override
517                         public long[] getFirst() {
518                                 return ids.getFirst();
519                         }
520
521                         @Override
522                         public long[] getSecond() {
523                                 return ids.getSecond();
524                         }
525                         
526                 };
527                 
528         }
529
530         public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws ProCoreException, AcornAccessVerificationException, IllegalAcornStateException {
531                 return clusters.getResourceFile(clusterUID, resourceIndex);
532         }
533
534         @Override
535         public ResourceSegment getResourceSegment(final byte[] clusterUID, final int resourceIndex, final long segmentOffset, short segmentSize) throws ProCoreException {
536                 try {
537             return clusters.getResourceSegment(clusterUID, resourceIndex, segmentOffset, segmentSize);
538         } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
539             throw new ProCoreException(e);
540         }
541         }
542
543         @Override
544         public long reserveIds(int count) throws ProCoreException {
545                 return clusters.state.reservedIds++;
546         }
547
548         @Override
549         public void updateCluster(byte[] operations) throws ProCoreException {
550             ClusterInfo info = null;
551             try {
552                 ClusterUpdateOperation operation = new ClusterUpdateOperation(clusters, operations);
553                 info = clusters.clusterLRU.getOrCreate(operation.uid, true);
554                 if(info == null)
555                     throw new IllegalAcornStateException("info == null for operation " + operation);
556                 info.acquireMutex();
557                         info.scheduleUpdate();
558                         mainProgram.schedule(operation);
559                 } catch (IllegalAcornStateException | AcornAccessVerificationException e) {
560             throw new ProCoreException(e);
561         } finally {
562             if (info != null)
563                 info.releaseMutex();
564                 }
565         }
566
567         private UndoClusterUpdateProcessor getUndoCSS(String ccsId) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException {
568
569                 String[] ss = ccsId.split("\\.");
570                 String chunkKey = ss[0];
571                 int chunkOffset = Integer.parseInt(ss[1]);
572                 ClusterStreamChunk chunk = clusters.streamLRU.getWithoutMutex(chunkKey);
573                 if(chunk == null) throw new IllegalAcornStateException("Cluster Stream Chunk " + chunkKey + " was not found.");
574                 chunk.acquireMutex();
575                 try {
576                         return chunk.getUndoProcessor(clusters, chunkOffset, ccsId);
577                 } catch (DatabaseException e) {
578                     throw e;
579                 } catch (Throwable t) {
580                         throw new IllegalStateException(t);
581                 } finally {
582                         chunk.releaseMutex();
583                 }
584         }
585         
586         private void performUndo(String ccsId, ArrayList<Pair<ClusterUID, byte[]>> clusterChanges, UndoClusterSupport support) throws ProCoreException, DatabaseException, IllegalAcornStateException, AcornAccessVerificationException {
587                 UndoClusterUpdateProcessor proc = getUndoCSS(ccsId);
588
589                 int clusterKey = clusters.getClusterKeyByClusterUIDOrMakeWithoutMutex(proc.getClusterUID());
590
591                 clusters.clusterLRU.acquireMutex();
592                 try {
593
594                         ClusterChange cs = new ClusterChange(clusterChanges, proc.getClusterUID());
595                         for(int i=0;i<proc.entries.size();i++) {
596                                 
597                                 Entry e = proc.entries.get(proc.entries.size() - 1 - i);
598                                 e.process(clusters, cs, clusterKey);
599                         }
600                         cs.flush();
601
602                 } finally {
603                         clusters.clusterLRU.releaseMutex();
604                 }
605         }
606         
607         @Override
608         public boolean undo(long[] changeSetIds, OnChangeSetUpdate onChangeSetUpdate) throws SDBException {
609             try {
610                 final ArrayList<Pair<ClusterUID, byte[]>> clusterChanges = new ArrayList<Pair<ClusterUID, byte[]>>();
611                 
612                 UndoClusterSupport support = new UndoClusterSupport(clusters);
613                 
614                 final int changeSetId = clusters.state.headChangeSetId;
615                 
616                 if(ClusterUpdateProcessorBase.DEBUG)
617                         System.err.println(" === BEGIN UNDO ===");
618                 
619                 for(int i=0;i<changeSetIds.length;i++) {
620                         final long id = changeSetIds[changeSetIds.length-1-i];
621                         ArrayList<String> ccss = clusters.getChanges(id);
622     
623                         for(int j=0;j<ccss.size();j++) {
624                                 String ccsid = ccss.get(ccss.size()-j-1);
625                                 try {
626                                         if(ClusterUpdateProcessorBase.DEBUG)
627                                                 System.err.println("performUndo " + ccsid);
628                                         performUndo(ccsid, clusterChanges, support);
629                                 } catch (DatabaseException e) {
630                                         e.printStackTrace();
631                                 }
632                         }
633                 }
634     
635                 if(ClusterUpdateProcessorBase.DEBUG)
636                         System.err.println(" === END UNDO ===");
637     
638                 for(int i=0;i<clusterChanges.size();i++) {
639                         
640                         final int changeSetIndex = i;
641                         
642                         final Pair<ClusterUID, byte[]> pair = clusterChanges.get(i);
643                         
644                         final ClusterUID cuid = pair.first;
645                         final byte[] data = pair.second;
646     
647                         onChangeSetUpdate.onChangeSetUpdate(new ChangeSetUpdate() {
648     
649                                 @Override
650                                 public long getChangeSetId() {
651                                         return changeSetId;
652                                 }
653     
654                                 @Override
655                                 public int getChangeSetIndex() {
656                                         return 0;
657                                 }
658     
659                                 @Override
660                                 public int getNumberOfClusterChangeSets() {
661                                         return clusterChanges.size();
662                                 }
663     
664                                 @Override
665                                 public int getIndexOfClusterChangeSet() {
666                                         return changeSetIndex;
667                                 }
668     
669                                 @Override
670                                 public byte[] getClusterId() {
671                                         return cuid.asBytes();
672                                 }
673     
674                                 @Override
675                                 public boolean getNewCluster() {
676                                         return false;
677                                 }
678     
679                                 @Override
680                                 public byte[] getData() {
681                                         return data;
682                                 }
683     
684                         });
685                 }
686         } catch (AcornAccessVerificationException | IllegalAcornStateException e1) {
687             throw new ProCoreException(e1);
688         }
689                 return false;
690         }
691         
692         public ServiceLocator getServiceLocator() {
693             return locator;
694         }
695
696     @Override
697     public boolean refreshEnabled() {
698         return false;
699     }
700
701     @Override
702     public boolean rolledback() {
703         return clusters.rolledback();
704     }
705
706         
707         
708         
709         
710         
711         
712         
713         
714         
715         
716         ////////////////////////
717         
718         
719         
720         
721         
722         
723         
724         
725         
726         
727         
728         
729 }
730