FaspManager
using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
namespace Aspera.Transfer
{
public class FaspManager
{
private class FaspManagerListener : FileTransferListener, TransferListener
{
public void fileSessionEvent(TransferEvent _event, SessionStats sessionStats, FileStats fileStats)
{
ArrayList arrayList = new ArrayList(listenersList);
for (int i = 0; i < arrayList.Count; i++) {
FileTransferListener fileTransferListener = (FileTransferListener)arrayList[i];
fileTransferListener.fileSessionEvent(_event, sessionStats, fileStats);
}
}
}
private class CSocketPacket
{
public Socket thisSocket;
public byte[] dataBuffer = new byte[32768];
}
private static FaspManager _fm = null;
private AsyncCallback FmWorkerCallBack;
private Socket m_socListener;
private Thread listenThread;
private bool autoRemoveCompletedTransferReferences = true;
private bool debugEnabled;
private Hashtable _sessionHash;
private Hashtable pendingFirstMessageReadsHash;
private Hashtable sessionIdHash;
private Hashtable userStrHash;
private bool _terminate;
private static ArrayList listenersList = ArrayList.Synchronized(new ArrayList());
private FaspManagerListener _listener = new FaspManagerListener();
private static ManualResetEvent ListenSync = new ManualResetEvent(false);
private static object syncObject = new object();
private int _port;
private bool publishManagementPort;
public bool AutoRemoveCompletedTransferReferences {
get {
return autoRemoveCompletedTransferReferences;
}
set {
autoRemoveCompletedTransferReferences = value;
}
}
private void startListen()
{
try {
m_socListener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
IPEndPoint localEP = new IPEndPoint(IPAddress.Any, Environment.getManagementPort());
m_socListener.Bind(localEP);
_port = ((IPEndPoint)m_socListener.LocalEndPoint).Port;
lock (syncObject) {
m_socListener.Listen(10);
if (publishManagementPort)
writePortFile(_port);
Monitor.PulseAll(syncObject);
Logger.Log(TraceEventType.Information, "Listening on port: " + _port);
}
Environment.setManagementPort(_port);
while (!isTerminate()) {
ListenSync.Reset();
m_socListener.BeginAccept(OnClientConnect, null);
ListenSync.WaitOne();
}
} catch (Exception ex) {
throw new FaspManagerException("Unable to start fasp manager: " + ex.Message, ex);
} finally {
try {
m_socListener.Shutdown(SocketShutdown.Both);
} catch {
Logger.Log(TraceEventType.Error, "m_socListener.Shutdown: Management socket has been disposed of");
}
m_socListener.Close();
}
}
private void writePortFile(int _port)
{
string portFilePath = Environment.getPortFilePath();
try {
Logger.Log(TraceEventType.Information, "Writing the port file at " + portFilePath);
string directoryName = Path.GetDirectoryName(portFilePath);
if (!Directory.Exists(directoryName))
Directory.CreateDirectory(directoryName);
FileStream fileStream = new FileStream(portFilePath, FileMode.Create, FileAccess.Write);
StreamWriter streamWriter = new StreamWriter(fileStream);
streamWriter.Write(_port.ToString());
streamWriter.Close();
fileStream.Close();
} catch (Exception ex) {
throw new FaspManagerException("Failed writing port file: " + ex.Message, ex);
}
}
private void removePortFile()
{
FileInfo fileInfo = new FileInfo(Environment.getPortFilePath());
fileInfo.Delete();
}
private bool isTerminate()
{
return _terminate;
}
private void OnClientConnect(IAsyncResult asyn)
{
Socket soc = null;
try {
soc = m_socListener.EndAccept(asyn);
} catch (ObjectDisposedException arg) {
Logger.Log(TraceEventType.Information, "OnClientConnect: Listener Socket has been closed - " + arg);
return;
} catch (SocketException ex) {
Logger.Log(TraceEventType.Error, "OnClientConnect: Listener socket exception. Code - " + ex.ErrorCode);
}
try {
Logger.Log(TraceEventType.Verbose, "Accepted connection from ascp process.");
ListenSync.Set();
WaitForData(soc);
} catch (ObjectDisposedException) {
Logger.Log(TraceEventType.Error, "OnClientConnect: Management socket has been closed");
} catch (SocketException ex3) {
Logger.Log(TraceEventType.Error, "OnClientConnect: Management socket exception. Code - " + ex3.ErrorCode);
}
}
private FaspManager()
{
_sessionHash = Hashtable.Synchronized(new Hashtable());
pendingFirstMessageReadsHash = Hashtable.Synchronized(new Hashtable());
sessionIdHash = Hashtable.Synchronized(new Hashtable());
userStrHash = Hashtable.Synchronized(new Hashtable());
}
private void WaitForData(Socket soc)
{
if (FmWorkerCallBack == null)
FmWorkerCallBack = OnDataReceived;
CSocketPacket cSocketPacket = new CSocketPacket();
cSocketPacket.thisSocket = soc;
soc.BeginReceive(cSocketPacket.dataBuffer, 0, cSocketPacket.dataBuffer.Length, SocketFlags.None, FmWorkerCallBack, cSocketPacket);
}
private void OnDataReceived(IAsyncResult asyn)
{
if (asyn == null)
Logger.Log(TraceEventType.Error, "Null was passed to OnDataReceived(IAsyncResult asyn)");
else {
CSocketPacket cSocketPacket = (CSocketPacket)asyn.AsyncState;
int num = 0;
FileTransferSession fileTransferSession = null;
try {
num = cSocketPacket.thisSocket.EndReceive(asyn);
if (num != 0) {
char[] array = new char[num];
Decoder decoder = Encoding.UTF8.GetDecoder();
decoder.GetChars(cSocketPacket.dataBuffer, 0, num, array, 0);
string text = new string(array);
if (_sessionHash.Contains(cSocketPacket.thisSocket))
fileTransferSession = (FileTransferSession)_sessionHash[cSocketPacket.thisSocket];
else {
if (pendingFirstMessageReadsHash.Contains(cSocketPacket.thisSocket)) {
string str = (string)pendingFirstMessageReadsHash[cSocketPacket.thisSocket];
text = str + text;
pendingFirstMessageReadsHash.Remove(cSocketPacket.thisSocket);
}
if (isFirstMessageObtained(text)) {
string userStr = getUserStr(text);
if (userStrHash.Contains(userStr)) {
fileTransferSession = (FileTransferSession)userStrHash[userStr];
userStrHash.Remove(userStr);
} else {
fileTransferSession = new FileTransferSession(debugEnabled);
fileTransferSession.setFM(this);
fileTransferSession.IsOtherInitiated = true;
string text2 = Guid.NewGuid().ToString();
fileTransferSession.setUserStr(text2);
sessionIdHash.Add(text2, fileTransferSession);
}
_sessionHash.Add(cSocketPacket.thisSocket, fileTransferSession);
fileTransferSession.setWriteSocket(cSocketPacket.thisSocket);
fileTransferSession.addListener(_listener);
} else
pendingFirstMessageReadsHash.Add(cSocketPacket.thisSocket, text);
}
fileTransferSession?.processReadData(text);
WaitForData(cSocketPacket.thisSocket);
} else {
if (_sessionHash.Contains(cSocketPacket.thisSocket)) {
Session session = (FileTransferSession)_sessionHash[cSocketPacket.thisSocket];
string id = session.id;
session.close();
sessionIdHash.Remove(id);
_sessionHash.Remove(cSocketPacket.thisSocket);
Logger.Log(TraceEventType.Information, "SessionID: " + id + " ascp exited: Cleaned up the session");
}
cSocketPacket.thisSocket.Close();
}
} catch (Exception ex) {
string text3 = ex.Message;
Session session2 = (Session)_sessionHash[cSocketPacket.thisSocket];
if (session2 != null) {
if (session2.getId() != null)
text3 = session2.getId() + " - " + text3;
text3 = text3 + " Session ID: " + session2.getSessionId();
session2.close();
session2.handleAscpCrash();
}
Logger.Log(TraceEventType.Error, "Error reading data over management connection: " + text3);
}
}
}
private bool isFirstMessageObtained(string dataString)
{
if (dataString.Contains("\n\n"))
return true;
return false;
}
private string getUserStr(string dataString)
{
string result = "";
string[] array = dataString.Split(new char[1] {
'\n'
}, StringSplitOptions.RemoveEmptyEntries);
string[] array2 = array;
foreach (string text in array2) {
if (text.Contains("UserStr"))
result = text.Substring(text.IndexOf("UserStr: ") + 9);
}
return result;
}
public static FaspManager getInstance()
{
if (_fm == null) {
FaspManager faspManager = _fm = new FaspManager();
try {
faspManager.start();
} catch (Exception ex) {
_fm = null;
throw new FaspManagerException("Failed to initialize Fasp Manager: " + ex.Message);
}
}
return _fm;
}
public bool isRunning()
{
return listenThread != null;
}
private void start()
{
if (Environment.getFaspScpPath() == null || Environment.getFaspScpPath().Length == 0)
throw new FaspManagerException("ascp.exe not found");
if (!isRunning()) {
lock (syncObject) {
_terminate = false;
Logger.Log(TraceEventType.Information, "Starting Aspera Fasp Manager. Version " + getReleaseVersion());
listenThread = new Thread(_fm.startListen);
listenThread.Start();
Monitor.Wait(syncObject);
}
}
}
public ICollection getSessionIDList()
{
return sessionIdHash.Keys;
}
public SessionStats getSessionStats(string jobId)
{
if (sessionIdHash.Contains(jobId))
return ((FileTransferSession)sessionIdHash[jobId]).getSessionStats();
throw new FaspManagerException("No session found with the specified ID: " + jobId);
}
public void addJobListener(string jobId, FileTransferListener listener)
{
if (!sessionIdHash.Contains(jobId))
throw new FaspManagerException("No session found with the specified ID: " + jobId);
FileTransferSession fileTransferSession = (FileTransferSession)sessionIdHash[jobId];
Logger.Log(TraceEventType.Information, "SessionID: " + jobId + " Adding listener");
fileTransferSession.addListener(listener);
}
public void removeJobListener(string jobId, FileTransferListener listener)
{
if (!sessionIdHash.Contains(jobId))
throw new FaspManagerException("No session found with the specified ID: " + jobId);
FileTransferSession fileTransferSession = (FileTransferSession)sessionIdHash[jobId];
Logger.Log(TraceEventType.Information, "SessionID: " + jobId + " Adding listener");
fileTransferSession.removeListener(listener);
}
public SessionStats getSessionStatsByCookie(string cookie)
{
ICollection values = _sessionHash.Values;
foreach (FileTransferSession item in values) {
if (item.Cookie == cookie)
return item.getSessionStats();
}
throw new FaspManagerException("No session found with the specified cookie: " + cookie);
}
public void listenForServerSessions(bool On)
{
if (On)
writePortFile(_port);
else
removePortFile();
publishManagementPort = On;
}
public bool isListeningForServerSessions()
{
return publishManagementPort;
}
public void enableFaspDebug(bool enabled)
{
debugEnabled = enabled;
}
public TraceSource getTraceSource()
{
return Logger.getTraceSource();
}
public Version getReleaseVersion()
{
return typeof(FaspManager).Assembly.GetName().Version;
}
public static void destroy()
{
if (_fm != null && !_fm.isRunning())
Logger.Log(TraceEventType.Warning, "Destroy method called. But this Fasp Manager instance is already destroyed.");
else {
Logger.Log(TraceEventType.Information, "Destroying the fasp manager instance.");
_fm.cleanUpSessions();
_fm.stopListening();
_fm = null;
}
}
public bool removeTransferReference(string sessionId)
{
if (_fm != null && _fm.sessionIdHash != null && _fm.sessionIdHash.ContainsKey(sessionId)) {
_fm.sessionIdHash.Remove(sessionId);
return true;
}
return false;
}
private void stopListening()
{
Logger.Log(TraceEventType.Verbose, "Stopping listening thread.");
_terminate = true;
endListen();
ListenSync.Set();
listenThread.Join();
listenThread = null;
}
private void cleanUpSessions()
{
Logger.Log(TraceEventType.Verbose, "Cleaning job and session hash tables.");
ICollection values = _sessionHash.Values;
foreach (Session item in values) {
if (item != null && !item.isFinished() && !item.isRemote())
try {
item.cancel();
} catch (FaspManagerException ex) {
Logger.Log(TraceEventType.Warning, "Error canceling session - " + ex.Message);
}
}
_sessionHash.Clear();
}
internal void removeCompletedSession(Socket sock)
{
try {
if (_fm != null && _fm.sessionIdHash != null) {
if (autoRemoveCompletedTransferReferences)
_fm.sessionIdHash.Remove(((FileTransferSession)_sessionHash[sock]).id);
_fm._sessionHash.Remove(sock);
}
} catch (Exception ex) {
Logger.Log(TraceEventType.Warning, "Error removing completed session: " + ex.Message);
}
}
private void endListen()
{
Logger.Log(TraceEventType.Verbose, "Closing the server socket");
if (m_socListener.Connected)
m_socListener.Disconnect(true);
m_socListener.Close();
if (publishManagementPort)
try {
removePortFile();
} catch (Exception ex) {
Logger.Log(TraceEventType.Error, "Unable to delete the port file." + ex.Message);
}
}
public void addListener(TransferListener listener)
{
Logger.Log(TraceEventType.Information, "Adding a manager level listener");
listenersList.Add(listener);
}
public void removeListener(TransferListener listener)
{
Logger.Log(TraceEventType.Information, "Removing a manager level listener");
listenersList.Remove(listener);
}
public string startTransfer(JobOrder order, FileTransferListener listener = null)
{
return startTransfer(order, 1, listener)[0];
}
public string startTransfer(JobOrder order, Guid xferId, int xferRetry = 0, FileTransferListener listener = null)
{
return startTransfer(order, xferId, xferRetry, 1, listener)[0];
}
public List<string> startTransfer(JobOrder order, int nbProcesses, FileTransferListener listener = null)
{
return startTransfer(order, Guid.NewGuid(), 0, nbProcesses, listener);
}
public List<string> startTransfer(JobOrder order, Guid xferId, int xferRetry = 0, int nbProcesses = 1, FileTransferListener listener = null)
{
if (nbProcesses > 1 && order.getXferParams().persist)
throw new FaspManagerException("Cannot do multi process transfer with persistent session");
List<string> list = new List<string>();
for (int i = 1; i <= nbProcesses; i++) {
list.Add(doStartSession(order, xferId, xferRetry, i, nbProcesses, listener));
}
return list;
}
private string doStartSession(JobOrder order, Guid xferId, int xferRetry, int processIndex, int nbProcesses, FileTransferListener listener)
{
string text = Guid.NewGuid().ToString();
FileTransferSession fileTransferSession = new FileTransferSession(order, listener, text, debugEnabled);
fileTransferSession.setXferId(xferId);
fileTransferSession.setXferRetry(xferRetry);
fileTransferSession.setProcessIndex(processIndex);
fileTransferSession.setNbProcesses(nbProcesses);
fileTransferSession.setFM(this);
if (order.getXferParams().persist)
fileTransferSession.setPersistent(true);
sessionIdHash.Add(text, fileTransferSession);
userStrHash.Add(text, fileTransferSession);
fileTransferSession.start(_port);
return text;
}
public void setRate(string jobId, int targetRateKbps, int minRateKbps, Policy policy)
{
if (!sessionIdHash.Contains(jobId))
throw new FaspManagerException("Rate Change Failed: Specified job id not found");
FileTransferSession fileTransferSession = (FileTransferSession)sessionIdHash[jobId];
fileTransferSession.setRate(targetRateKbps, minRateKbps, policy);
}
public void cancelTransfer(string jobId)
{
if (!sessionIdHash.Contains(jobId))
throw new FaspManagerException("Cancel Failed: Specified job id not found");
FileTransferSession fileTransferSession = (FileTransferSession)sessionIdHash[jobId];
fileTransferSession.cancel();
}
public void stopTransfer(string jobId)
{
if (!sessionIdHash.Contains(jobId))
throw new FaspManagerException("Stop Failed: Specified job id not found");
FileTransferSession fileTransferSession = (FileTransferSession)sessionIdHash[jobId];
fileTransferSession.stop();
}
public void terminateTransfer(string jobId)
{
if (!sessionIdHash.Contains(jobId))
throw new FaspManagerException("Stop Failed: Specified job id not found");
FileTransferSession fileTransferSession = (FileTransferSession)sessionIdHash[jobId];
fileTransferSession.terminate();
}
public void lockPersistentSession(string jobId)
{
if (!sessionIdHash.Contains(jobId))
throw new FaspManagerException("Stop Failed: Specified job id not found");
FileTransferSession fileTransferSession = (FileTransferSession)sessionIdHash[jobId];
fileTransferSession.stopWhenIdle();
}
public void addSource(string jobId, string sourcePath, string destPath, long startByte, long endByte)
{
if (!sessionIdHash.Contains(jobId))
throw new FaspManagerException("AddSource Failed: Specified job id not found");
FileTransferSession fileTransferSession = (FileTransferSession)sessionIdHash[jobId];
if (fileTransferSession.getPersistentSessionLocked())
throw new FaspManagerException("Cannot add source path after invoking command stopWhenIdle");
fileTransferSession.addSource(sourcePath, destPath, startByte, endByte);
}
public void addSource(string id, string sourcePath, string destPath)
{
addSource(id, sourcePath, destPath, 0, 0);
}
}
}