Newer
Older
AlgebraicDataflowArchitectureModel / AlgebraicDataflowArchitectureModel / src / simulator / interfaces / timers / TimerService.java
package simulator.interfaces.timers;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ScheduledThreadPoolExecutor;

import models.algebra.Constant;
import models.algebra.Expression;
import models.algebra.Term;
import models.dataConstraintModel.JsonTerm;
import models.dataConstraintModel.MapTerm;
import models.dataConstraintModel.ResourcePath;
import models.dataFlowModel.DataTransferChannel;
import simulator.Event;
import simulator.Resource;
import simulator.Simulator;
import simulator.SystemState;
import simulator.interfaces.INativeReceiver;

public class TimerService implements INativeReceiver {
	public final String timersUpdatedChannelName = "TimersUpdated";
	public final String timerEventChannelName = "TimerEvent";

	protected final Simulator simulator;
	protected final Map<String, ScheduledThreadPoolExecutor> timers;	
	protected DataTransferChannel timersUpdatedChannel;
	protected DataTransferChannel timerEventChannel;
	
	public TimerService(Simulator simulator) {
		this.simulator = simulator;
		timers = new HashMap<>();
		timersUpdatedChannel = (DataTransferChannel) simulator.getModel().getChannel(timersUpdatedChannelName);
		timerEventChannel = (DataTransferChannel) simulator.getModel().getInputChannel(timerEventChannelName);
		simulator.addNativeReceiver(this, timersUpdatedChannel);
	}
	
	@Override
	public void onReceiveFromModel(Event event, SystemState nextSystemState) {
		Expression message = event.getMessage();
		if (message instanceof Term && ((Term) message).getChildren().size() >= 2) {
			Expression curTimersExp = ((Term) message).getChild(0);
			Expression nextTimersExp = ((Term) message).getChild(1);
			if (curTimersExp instanceof MapTerm && nextTimersExp instanceof MapTerm) {
				MapTerm curTimers = (MapTerm) curTimersExp;
				MapTerm nextTimers = (MapTerm) nextTimersExp;
				Set<String> oldTidSet = new HashSet<>(curTimers.keySet());
				Set<String> newTidSet = new HashSet<>(nextTimers.keySet());
				oldTidSet.removeAll(nextTimers.keySet());
				newTidSet.removeAll(curTimers.keySet());
				if (!oldTidSet.isEmpty() || !newTidSet.isEmpty()) {
					// If the set of timers is changed.
					
					// Remove old timers and their native receivers.
					for (String tid: oldTidSet) {
						ScheduledThreadPoolExecutor timer = timers.get(tid);
						timer.shutdown();
						timers.remove(tid);
					}
		
					// Add new timers.
					Resource timersResource = nextSystemState.getResource(event.getInputResource().getResourceIdentifier());
					for (String tid: newTidSet) {
						Expression value = nextTimers.get(tid);
						if (value instanceof JsonTerm) {
							JsonTerm timerValue = (JsonTerm) value;
							Resource timerResource = timersResource.getChildrenMap().get(tid);
							// Add a timer.
							Expression intervalExp = timerValue.get("interval");
							long interval = Long.parseLong(((Constant) intervalExp).toString());
							ScheduledThreadPoolExecutor timer = new ScheduledThreadPoolExecutor(0);
							timers.put(tid, timer);
							
							// Connect Java timer and model.
							ResourcePath resPath = timerEventChannel.getOutputResources().iterator().next();
							TimerEventSender sender = new TimerEventSender(simulator, timerEventChannel, resPath, timerResource);	// timer => timerResource
							timer.scheduleAtFixedRate(sender, 0, interval, TimeUnit.MILLISECONDS);
						}
					}
				}
			}
		}
	}
	
	public class TimerStart implements INativeReceiver {
		public final String timerStartChannelName = "TimerStart";
		protected DataTransferChannel timerStartChannel;
		protected DataTransferChannel timerEventChannel;
		
		public TimerStart() {
			timerStartChannel = (DataTransferChannel) simulator.getModel().getChannel(timerStartChannelName);
			timerEventChannel = (DataTransferChannel) simulator.getModel().getInputChannel(timerEventChannelName);
			simulator.addNativeReceiver(this, timerStartChannel);
		}

		@Override
		public void onReceiveFromModel(Event event, SystemState nextSystemState) {
			Expression message = event.getMessage();
			if (message instanceof Term && ((Term) message).getChildren().size() >= 1) {
				Expression tidExp = event.getInputResource().getResourceIdentifier().getLastParam();
				Expression intervalExp = ((Term) message).getChild(0);
				if (tidExp instanceof Constant && intervalExp instanceof Constant) {
					String tid = ((Constant) tidExp).toString();
					long interval = Long.parseLong(((Constant) intervalExp).toString());
					ScheduledThreadPoolExecutor timer = timers.get(tid);
					TimerEventSender sender = new TimerEventSender(simulator, timerEventChannel, event.getInputResourcePath(), event.getInputResource());
					timer.scheduleAtFixedRate(sender, 0, interval, TimeUnit.MILLISECONDS);
				}
			}
		}
	}

	public class TimerClear implements INativeReceiver {
		public final String timerClearChannelName = "TimerClear";
		protected DataTransferChannel timerClearChannel;
		
		public TimerClear() {
			timerClearChannel = (DataTransferChannel) simulator.getModel().getChannel(timerClearChannelName);
			simulator.addNativeReceiver(this, timerClearChannel);
		}
		
		@Override
		public void onReceiveFromModel(Event event, SystemState nextSystemState) {
			Expression message = event.getMessage();
			if (message instanceof Term && ((Term) message).getChildren().size() >= 1) {
				Expression tidExp = event.getInputResource().getResourceIdentifier().getLastParam();
				if (tidExp instanceof Constant) {
					String tid = ((Constant) tidExp).toString();
					ScheduledThreadPoolExecutor timer = timers.get(tid);
					timer.shutdown();
					timers.remove(tid);
				}
			}
		}
		
	}
}