Session
using Newtonsoft.Json.Linq;
using System;
using System.Collections;
using System.Diagnostics;
using System.Net.Sockets;
using System.Text;
namespace Aspera.Transfer
{
public abstract class Session
{
protected internal string id;
protected internal Guid xferId;
protected internal int xferRetry;
protected internal int processIndex = 1;
protected internal int nbProcesses = 1;
protected internal SessionState state;
protected internal ArrayList _listenersList = new ArrayList();
protected internal Policy Adaptive;
protected internal string BlockInfo = "";
protected internal string Code = "";
protected internal string Cookie = "";
protected internal string errDescription = "";
protected internal string Direction = "";
protected internal string DiskInfo = "";
protected internal ulong ElapsedUsec;
protected internal bool Encryption;
protected internal uint BWMeasurement;
protected internal uint Delay;
protected internal ulong FileBytes;
protected internal string Operation = "";
protected internal ulong TransferBytes;
protected internal string Host = "";
protected internal string Password = "";
protected internal uint priority;
protected internal string Progress = "";
protected internal string Query = "";
protected internal string QueryResponse = "";
protected internal string ServiceLevel = "";
protected internal string SessionId = "";
protected internal string Size = "";
protected internal string StartByte = "";
protected internal string Token = "";
protected internal string User = "";
protected internal string version = "";
protected internal ulong bandwidthCapKbps;
protected internal int errNum;
protected internal bool IsUpload = true;
protected internal ulong minimumRateKbps;
protected internal int rttMs;
protected internal ulong targetRateKbps;
protected internal long totalElapsedMs;
protected internal ulong totalLostBytes;
protected internal uint udpPort;
protected internal JobOrder _order;
protected internal ulong preTransferFiles;
protected internal ulong preTransferBytes;
internal bool IsRemote;
internal bool IsOtherInitiated;
protected internal string manifestFilePath = "";
protected internal uint tcpPort;
protected internal string serverNodeId = "";
protected internal string clientNodeId = "";
protected internal string serverClusterId = "";
protected internal string clientClusterId = "";
protected internal int retryTimeout;
protected internal ulong filesFailed;
protected internal ulong filesComplete;
protected internal ulong filesSkipped;
protected string msgBuffer;
private Socket writeSocket;
protected bool firstTime = true;
private int _startTime;
protected bool debugEnabled;
protected bool persistent;
protected bool persistentSessionLocked;
protected int transfersAttempted;
protected int transfersFailed;
protected int transfersPassed;
protected int transfersSkipped;
protected int sourcePathsScanAttempted;
protected int sourcePathsScanFailed;
protected int sourcePathsScanIrregular;
protected int sourcePathsScanExcluded;
protected JObject ;
protected int pmtu;
protected Session(JobOrder order, string jobId, bool debug)
{
_order = order;
id = jobId;
debugEnabled = debug;
_startTime = DateTime.Now.Millisecond;
}
protected Session(bool debug)
{
_startTime = DateTime.Now.Millisecond;
debugEnabled = debug;
}
internal virtual void start(int port)
{
}
public abstract void handleAscpCrash();
public virtual void setRate(int targetRate, int minimumRate, Policy policy)
{
Logger.Log(TraceEventType.Information, id + " - Setting rate parameters. target=" + targetRate + " minimum=" + minimumRate + " policy=" + policy);
if (state == SessionState.FINISHED)
throw new FaspManagerException("Rate Change Failed: Transfer already completed");
if (writeSocket != null) {
FaspMessage faspMessage = new FaspMessage(id);
faspMessage.setType("RATE");
faspMessage.addField("Rate", targetRate.ToString());
faspMessage.addField("MinRate", minimumRate.ToString());
string mgmtValue = PolicyHelper.getMgmtValue(policy);
faspMessage.addField("Adaptive", mgmtValue);
sendMgmtMessage(faspMessage.constructMessage());
return;
}
throw new FaspManagerException("\nTransfer has not started yet. It is too early to change the rate. ");
}
public void addListener(TransferListener listener)
{
_listenersList.Add(listener);
}
public void removeListener(TransferListener listener)
{
_listenersList.Remove(listener);
}
public void close()
{
writeSocket.Close();
handleAscpCrash();
}
public virtual void cancel()
{
Logger.Log(TraceEventType.Information, id + " - Cancelling the session");
terminate();
}
public virtual void stop()
{
if (_order == null || !_order.getXferParams().persist)
throw new FaspManagerException("Not a persistent session. Use cancelTransfer");
Logger.Log(TraceEventType.Information, id + " - Stopping the session");
terminate();
}
public virtual void terminate()
{
Logger.Log(TraceEventType.Information, id + " - Terminating the session");
if (state == SessionState.FINISHED || state == SessionState.FAILED)
Logger.Log(TraceEventType.Warning, "Transfer already completed. Cannot terminate it anymore.");
else {
if (writeSocket == null)
throw new FaspManagerException("The channel has not been connected. It is too early to terminate the job.");
FaspMessage faspMessage = new FaspMessage(id);
if (_order != null && _order.getXferParams().persist)
faspMessage.setType("DONE");
else
faspMessage.setType("CANCEL");
sendMgmtMessage(faspMessage.constructMessage());
}
}
public virtual void stopWhenIdle()
{
Logger.Log(TraceEventType.Information, id + " - Stopping the session with 'linger' option");
if (state == SessionState.FINISHED || state == SessionState.FAILED)
throw new FaspManagerException("Transfer already completed. Cannot stop it anymore.");
if (writeSocket == null)
throw new FaspManagerException("The channel has not been connected. It is too early to stop the job.");
FaspMessage faspMessage = new FaspMessage(id);
faspMessage.setType("DONE");
faspMessage.addField("Operation", "Linger");
sendMgmtMessage(faspMessage.constructMessage());
persistentSessionLocked = true;
}
public bool isFinished()
{
if (state != SessionState.FAILED)
return state == SessionState.FINISHED;
return true;
}
public string getId()
{
return id;
}
public Guid getXferId()
{
return xferId;
}
protected internal void setXferId(Guid xferId)
{
this.xferId = xferId;
}
public int getXferRetry()
{
return xferRetry;
}
protected internal void setXferRetry(int xferRetry)
{
this.xferRetry = xferRetry;
}
public int getProcessIndex()
{
return processIndex;
}
protected internal void setProcessIndex(int processIndex)
{
this.processIndex = processIndex;
}
public int getNbProcesses()
{
return nbProcesses;
}
protected internal void setNbProcesses(int nbProcesses)
{
this.nbProcesses = nbProcesses;
}
public long getTotalTransferredBytes()
{
return Convert.ToInt64(TransferBytes);
}
protected internal Socket getWriteSocket()
{
return writeSocket;
}
public string getSessionId()
{
return SessionId;
}
public string getCookie()
{
return Cookie;
}
protected internal void setWriteSocket(Socket writeSocket)
{
this.writeSocket = writeSocket;
}
protected internal virtual void processReadData(string message)
{
}
protected virtual bool sendMgmtMessage(string msg)
{
byte[] bytes = new UTF8Encoding(false, true).GetBytes(msg);
try {
writeSocket.Send(bytes);
} catch (SocketException ex) {
Logger.Log(TraceEventType.Error, " - Failed sending the management message. " + ex.Message);
return false;
} catch (ObjectDisposedException ex2) {
Logger.Log(TraceEventType.Error, id + " - Failed sending the management message. " + ex2.Message);
return false;
}
return true;
}
public ulong getBandwidthCapKbps()
{
return bandwidthCapKbps;
}
public string getErrDescription()
{
return errDescription;
}
public int getErrCode()
{
return errNum;
}
public string getHost()
{
return Host;
}
public ulong getMinumumRateKbps()
{
return minimumRateKbps;
}
public Policy getPolicy()
{
return Adaptive;
}
public int getRttMs()
{
return Convert.ToInt32(Delay);
}
public int getStartTime()
{
return _startTime;
}
public SessionState getState()
{
return state;
}
public ulong getTargetRateKbps()
{
return targetRateKbps;
}
public string getToken()
{
return Token;
}
public long getTotalElapsedMs()
{
return Convert.ToInt64(ElapsedUsec);
}
public ulong getTotalLostBytes()
{
return totalLostBytes;
}
public ulong getTotalWrittenBytes()
{
return FileBytes;
}
public string getUser()
{
return User;
}
public uint getUdpPort()
{
return udpPort;
}
public bool isRemote()
{
return IsRemote;
}
public bool isUpload()
{
return IsUpload;
}
public void setCookie(string userCookie)
{
Cookie = userCookie;
}
public void setUserStr(string userStr)
{
id = userStr;
}
public void addSource(string sourcePath, string destPath, long startByte, long endByte)
{
Logger.Log(TraceEventType.Information, id + " - adding source: " + sourcePath + " dest: " + destPath);
if (!persistent)
throw new FaspManagerException("Cannot add source path to a non persistent session");
if (state == SessionState.FINISHED || state == SessionState.FAILED)
throw new FaspManagerException("Transfer already completed. Cannot add path.");
if (writeSocket != null) {
FaspMessage faspMessage = new FaspMessage(id);
faspMessage.setType("START");
faspMessage.addField("Source", sourcePath);
faspMessage.addField("Destination", destPath);
if (startByte != 0 || endByte != 0) {
string str = "";
str += ((startByte == 0) ? "" : startByte.ToString());
str += ":";
str += ((endByte == 0) ? "" : endByte.ToString());
faspMessage.addField("FileBytes", str);
}
sendMgmtMessage(faspMessage.constructMessage());
return;
}
throw new FaspManagerException("The channel has not been connected. It is too early to add source path to the job.");
}
public void setPersistent(bool persistent)
{
this.persistent = persistent;
}
public bool getPersistentSessionLocked()
{
return persistentSessionLocked;
}
}
}