実現したいこと
使用言語:C#
開発環境:Visual Studio 2022
TextBoxで指定したフォルダを監視し、対象フォルダ内にファイルが投入された場合に、
それを入力としてバッチファイルを作成・起動するWindowsFormsアプリケーションを開発しています。
バッチファイルでは更に別のアプリケーションを起動しており、そちらがスレッドにより同時起動できるため、非同期処理をしています。
バッチ起動後はエラーコードを取得し、正常終了かエラー終了かに応じて入力ファイルを移動しています。これは、処理が完了したことを知るためと、同一ファイルを二重に処理しないためという理由によります。
また、フォルダ監視にはFileSystemWatcherを使用していますが、ファイルの新規作成や更新が一度だけでもイベントが複数回発生することがあります。
それに対処するため、Reactive ExtensionsのThrottleにて待機し、発生したイベントをまとめています。別ファイルはまとめたくないため、対象のファイル名でグループ化も行っています。
発生している問題・分からないこと
やりたいことは実現できているのですが、再現性の低い問題がいくつか発生しています。
以下を回避するにはどうすればいいか、助言をいただけると幸いです。
1.バッチファイル内にoutputFileの記述がされないことがある
監視を開始してから複数のファイルが投入された場合、バッチファイルの中身は
入力ファイルを除いて同じになるはずです。それらはTextBoxで指定した値を
使用しているためです。
実際に多くの場合はその結果が得られているのですが、時々outputFileが
記述されないことがあります。
逆に、一つのバッチファイル内にoutputFileが二回書かれていたこともありました。
しかし、問題が発生してからもう一度同じファイルを投入すると
今度は発生しない等、発生しない場合も多々あります。
入力ファイルサイズが小さく、処理が即座に完了する場合に発生しやすいように
感じています。開発機では全く再現しないのですが、逆にファイルサイズが
大きい場合に発生するPCもあります。
2.一つの入力ファイルに対してcreateBatが二回発生することがある
そうならないようにRXでまとめているつもりなのですが、
ファイルを一つ投入したら2回createBatが動くことがあります。
これは特定PCでのみ発生しており、他PCでは再現しません。
また、全てのファイルが二重に処理されるわけではなく、一部のみです。
特定ファイルでのみ起きているわけでもありません。
3.outputFileの値が重複することがある
outputFileはTextBoxの値+実行日時(yyyyMMddHHmmss)+スレッド名としていたのですが、
これが重複することがありました。
Task.Delayで待機した上でミリ秒まで加えたら回避できました。
処理が1秒未満で完了すれば発生しうるのかもしれませんが、同一スレッドで
一つ前の処理と秒まで一致することがそんなにあるのだろうか…と少々腑に落ちません。
実は、別バッチのファイル名を使っているのではないかと懸念しています。
事象は回避できていますし、実際に処理が即完了する際に発生しているので、
上記の対応で問題なければ本項目はそれで結構です。
該当のソースコード
C#
1using Microsoft.VisualBasic.ApplicationServices; 2using Microsoft.VisualBasic.Logging; 3using System.Diagnostics; 4using System.Formats.Tar; 5using System.Text; 6using System.IO; 7using static System.Net.Mime.MediaTypeNames; 8using static System.Runtime.CompilerServices.RuntimeHelpers; 9using System; 10using System.Reactive.Linq; 11using System.Threading; 12 13 14namespace マルチスレッドSample { 15 16 public partial class Form1 : Form { 17 public Form1() { 18 InitializeComponent(); 19 } 20 21 //監視フォルダ内でファイルが更新、新規作成されたら実行 22 public string createBat(string inputFile,string outputFile,string directoryName) { 23 DateTime productionDT = DateTime.Now; 24 string thNo = "-" + Thread.CurrentThread.ManagedThreadId.ToString(); 25 26 //引数outputFileに実行日時と実行スレッドを付与して出力ファイル名を作成 27 string[] outputDTArr = outputFile.Split("."); 28 string outputDT = outputDTArr[0] + "_" + productionDT.ToString("yyyyMMddHHmmssfff") + thNo + "." + outputDTArr[1]; 29 30 Task t = Task.Delay(1000); 31 t.Wait(); 32 33 //Shift_JIS指定用 34 Encoding.RegisterProvider(CodePagesEncodingProvider.Instance); 35 36 //処理済ファイル保存用フォルダが存在しなければ作成 37 if(!Directory.Exists(directoryName + "\\Processed")){ 38 Directory.CreateDirectory(directoryName + "\\Processed"); 39 } 40 41 //エラーファイル保存用フォルダが存在しなければ作成 42 if(!Directory.Exists(directoryName + "\\Error")){ 43 Directory.CreateDirectory(directoryName + "\\Error"); 44 } 45 46 //バッチ作成 47 string batFileName = "マルチスレッドsample_" + productionDT.ToString("yyyyMMddHHmmssfff") + thNo + ".bat"; 48 using(var sw = new StreamWriter(batFileName,false,Encoding.GetEncoding("shift_jis"))) { 49 //別アプリケーションで必要な記述 50 sw.WriteLine("-input = " + inputFile); 51 sw.WriteLine("-output = " + outputDT); 52 sw.WriteLine("@echo errorlevel:%errorlevel%"); 53 } 54 55 //ProcessStartInfoインスタンスを生成 56 ProcessStartInfo processInfo = new ProcessStartInfo(batFileName){ 57 CreateNoWindow = true, 58 RedirectStandardOutput = true, 59 RedirectStandardError = true, 60 UseShellExecute = false 61 }; 62 //バッチ実行 63 Process process = Process.Start(processInfo); 64 //バッチファイルの実行結果を変数outputに格納 65 string output = process.StandardOutput.ReadToEnd(); 66 //C#側の処理を待機 67 process.WaitForExit(); 68 69 //エラーレベルを取得するための配列 70 string[] arr = output.Split("errorlevel:"); 71 string errorLevel = arr[1].Substring(0,1); 72 73 try{ 74 //実行したバッチファイルをバックアップとして保存 75 File.Move(batFileName,".\\Backup\\" + batFileName,true); 76 77 //正常終了したら入力ファイルをProcessedフォルダへ、それ以外ならErrorフォルダへ移動 78 if(errorLevel == "0"){ 79 File.Move(inputFile,directoryName + ".\\Processed\\" + Path.GetFileName(inputFile),true); 80 }else{ 81 File.Move(inputFile,directoryName + ".\\Error\\" + Path.GetFileName(inputFile),true); 82 } 83 84 }catch(Exception ex){ 85 //例外が発生したら処理を終了 86 process.Kill(); 87 } 88 89 //呼び出し元にエラーコードを返す 90 return errorLevel; 91 } 92 93 private FileSystemWatcher watcher = null; 94 95 private void button1_Click(object sender,EventArgs e) { 96 watcher = new FileSystemWatcher(); 97 98 watcher.Path = textBoxDirectoryName.Text; 99 watcher.NotifyFilter = 100 NotifyFilters.FileName 101 | NotifyFilters.DirectoryName 102 | NotifyFilters.LastWrite 103 | NotifyFilters.LastAccess; 104 105 watcher.Filter = ""; 106 watcher.SynchronizingObject = this; 107 108 //実行数の上限を設定 109 var semaphore = new SemaphoreSlim(3,3); 110 111 //FileSystemWatcherのイベントは更新一回に対して二回発生するため、RxのThrottleで1秒待機して 112 //その間に発生したイベントをまとめる。また、複数ファイルが投入された場合に対応するため、 113 //GroupByにてファイル名でグループ化する 114 watcher.ChangedAsObservable() 115 //FullPathでグループ化 116 .GroupBy(i => i.FullPath) 117 .Subscribe(g => { 118 string exe = Path.GetExtension(g.Key.ToString()); 119 //グループ(FullPathの値)ごとに1秒待機 120 g.Throttle(TimeSpan.FromSeconds(1)) 121 .Where(e => exe == ".csv" || exe == ".txt" || exe == ".dat") 122 .Subscribe(async i =>{ 123 await semaphore.WaitAsync().ConfigureAwait(false); 124 try{ 125 await Task.Run(() => { 126 string result = createBat(i.FullPath,textBoxOutput.Text,textBoxDirectoryName.Text); 127 }); 128 }finally{ 129 semaphore.Release(); 130 } 131 }); 132 }); 133 134 //監視開始 135 watcher.EnableRaisingEvents = true; 136 } 137 } 138 static class FileSystemWatcherExtensions{ 139 //ChangedイベントをIObservable<TEventArgs>にする 140 public static IObservable<FileSystemEventArgs> ChangedAsObservable(this FileSystemWatcher self){ 141 return Observable.FromEvent<FileSystemEventHandler, FileSystemEventArgs>( 142 h => (_, e) => h(e), 143 h => self.Changed += h, 144 h => self.Changed -= h); 145 } 146 } 147 }
試したこと・調べたこと
- teratailやGoogle等で検索した
- ソースコードを自分なりに変更した
- 知人に聞いた
- その他
上記の詳細・結果
自分なりに調べてみたのですが行き詰ってしまいました。一般論として、非同期処理自体が難しく再現性の低いエラーを起こしやすいようではありますが…
非同期処理を含んだアプリの開発自体が初めてのため、下記のように、どの部分を非同期とするべきなのかもきちんと理解できていないのが要因の一つと認識しています。
長文で大変恐縮ではございますが、ご教示いただけますと幸いです。
・createBatをpublic async Task<string> createBat()に変更してみましたが、その中でSemaphoreSlimを書いても、実行スレッド数が制御できませんでした。呼び出し側(button1_Clickイベントハンドラ)のSemaphoreSlimを削除しても同じであったため、この方法は一旦断念しています。
・以下のように、排他制御をsemaphoreではなくLockにしてみましたが、結果は変わりませんでした。
C#
1private object lockObj = new object(); 2private void button1_Click(object sender,EventArgs e) { 3 lock(lockObj){ 4 watcher.ChangedAsObservable() 5 //FullPathでグループ化 6 .GroupBy(i => i.FullPath) 7 .Subscribe(g =>{ 8 string exe = Path.GetExtension(g.Key.ToString()); 9 //グループ(FullPathの値)ごとに1秒待機 10 g.Throttle(TimeSpan.FromSeconds(1)) 11 //拡張CSV、TXT、DATが対象 12 .Where(e => exe == ".csv" || exe == ".txt" || exe == ".dat") 13 .Subscribe(async i =>{ 14 await Task.Run(() =>{ 15 semaphore.Wait(); 16 createBat(i.FullPath,textBoxOutput.Text,textBoxDirectoryName.Text); 17 semaphore.Release(); 18 }); 19 }); 20 }); 21 } 22 } 23}
補足
2024年9月6日追記
こちらの都合で恐縮ですが、この後出張や休暇が連続いたします。
コメント・回答をいただいた場合、反応が遅れてしまう可能性がございます。
場合によっては、9月第三週以降になるかもしれません。
恐れ入りますが、何卒ご了承ください。
2024年9月5日追記
ご助言に従ってログ出力を実装し、関数の引数や発生時刻を記述しました。
これが原因かは分かりませんが、createBatの呼び出しや、バッチファイルの書き込みが各スレッドでミリ秒まで一致している場合に事象1(正しく書き込まれない)が発生していました。
スレッドが異なっていたとしても、ミリ秒まで同じタイミングで実行すると不具合が生じるのでしょうか?