実現したいこと
PythonでDQNの強化学習のコードをパラメトリック量子回路を用いた
コードに書き換えようとしています。
前提
AのURLで定義されている解析クラス、Analysis()をBのコードに
移植します。元々、BのコードにはOpenAI Gymのcartpoleが環境として
設定されていたのですが、これを置き換えることになります。
発生している問題・エラーメッセージ
episodes = gather_episodes(state_bounds, n_actions, model, batch_size, Analysis())
の行で次のエラーが出ます。
normalized_states = [s/state_bounds for i, s in enumerate(states) if not done[i]]の行で、
s/state_boundsのsがNoneTypeであるのでエラーになっていると推察していますが、
解決方法が分かりません。どこを修正すべきか、手がかりを教えて頂けますと幸いです。
エラーメッセージ TypeError: float() argument must be a string or a number, not 'NoneType'
該当のソースコード
ソースコード !pip install openseespy import urllib.request import numpy as np import openseespy.opensees as op FREE = 0 FIXED = 1 X = 1 Y = 2 ROTZ = 3 class Analysis(gym.Env): def __init__(self): # Setting of actions that the network can take (values that the building structure could actually take) self.action = np.array([0.02, 0.03, 0.05, 0.08, 0.1, 0.12, 0.15, 0.18, 0.2, 0.25, 0.3]) self.naction = len(self.action) self.beta = 1/4 # Simplified mechanics model self.T0 = 4 self.h = self.action[0] self.hs = [self.h] self.m = 100 self.k = 4*np.pi**2*self.m/self.T0**2 # input seismic motion self.dt = 0.02 to_meter = 0.01 # Value to convert cm to m self.wave_url = 'https://raw.githubusercontent.com/kakemotokeita/dqn-seismic-control/main/wave/sample.csv' with urllib.request.urlopen(self.wave_url) as wave_file: self.wave_data = np.loadtxt(wave_file, usecols=(0,), delimiter=',', skiprows=3)*to_meter # Setting of OpenSees op.wipe() op.model('basic', '-ndm', 2, '-ndf', 3) # 2 dimensions, 3 dof per node # Energy global eneinte eneinte =0 # node self.bot_node = 1 self.top_node = 2 op.node(self.bot_node, 0., 0.) op.node(self.top_node, 0., 0.) # boundary conditions op.fix(self.top_node, FREE, FIXED, FIXED) op.fix(self.bot_node, FIXED, FIXED, FIXED) op.equalDOF(1, 2, *[Y, ROTZ]) # mass op.mass(self.top_node, self.m, 0., 0.) # Elastic stiffness elastic_mat_tag = 1 Fy = 1e10 E0 = self.k b = 1.0 op.uniaxialMaterial('Steel01', elastic_mat_tag, Fy, E0, b) # settings of element beam_tag = 1 op.element('zeroLength', beam_tag, self.bot_node, self.top_node, "-mat", elastic_mat_tag, "-dir", 1, '-doRayleigh', 1) # setting of external force load_tag_dynamic = 1 pattern_tag_dynamic = 1 self.values = list(-1 * self.wave_data) # should be negative op.timeSeries('Path', load_tag_dynamic, '-dt', self.dt, '-values', *self.values) op.pattern('UniformExcitation', pattern_tag_dynamic, X, '-accel', load_tag_dynamic) # Damping settings self.w0 = op.eigen('-fullGenLapack', 1)[0] ** 0.5 self.alpha_m = 0.0 self.beta_k = 2 * self.h / self.w0 self.beta_k_init = 0.0 self.beta_k_comm = 0.0 op.rayleigh(self.alpha_m, self.beta_k, self.beta_k_init, self.beta_k_comm) # Analysis settings op.wipeAnalysis() op.algorithm('Newton') op.system('SparseGeneral') op.numberer('RCM') op.constraints('Transformation') op.integrator('Newmark', 0.5, 0.25) op.analysis('Transient') tol = 1.0e-10 iterations = 10 op.test('EnergyIncr', tol, iterations, 0, 2) self.i_pre = 0 self.i = 0 self.i_next = 0 self.time = 0 self.analysis_time = (len(self.values) - 2) * self.dt self.dis = 0 self.vel = 0 self.acc = 0 self.a_acc = 0 self.force = 0 self.resp = { "time": [], "dis": [], "acc": [], "a_acc": [], "vel": [], "force": [], } self.done = False # Initialization def reset(self): self.__init__() # calculate for one step def step(self, action=0): self.time = op.getTime() assert(self.time < self.analysis_time) # Vary damping constants according to the action chosen. self.h = self.action[action] self.hs.append(self.h) self.beta_k = 2 * self.h / self.w0 op.rayleigh(self.alpha_m, self.beta_k, self.beta_k_init, self.beta_k_comm) op.analyze(1, self.dt) op.reactions() self.dis = op.nodeDisp(self.top_node, 1) self.vel = op.nodeVel(self.top_node, 1) self.acc = op.nodeAccel(self.top_node, 1) self.a_acc = self.acc + self.values[self.i] self.force = -op.nodeReaction(self.bot_node, 1) # Negative since diff node self.resp["time"].append(self.time) self.resp["dis"].append(self.dis) self.resp["vel"].append(self.vel) self.resp["acc"].append(self.acc) self.resp["a_acc"].append(self.a_acc) self.resp["force"].append(self.force) next_time = op.getTime() self.done = next_time >= self.analysis_time self.i_pre = self.i self.i += 1 self.i_next = self.i + 1 if not self.done else self.i return self.reward, self.done # reward @property def reward(self): return (0.1 / (np.abs(self.a_acc + 8) ))**3 # definition of damping average @property def h_ave(self): return np.average(self.hs) # definition of damping variance @property def h_sd(self): return np.sqrt(np.var(self.hs)) # Current State of Vibration Analysis @property def state(self): return np.array([self.values[self.i_pre], self.values[self.i], self.values[self.i_next], self.a_acc, self.acc, self.vel, self.dis], dtype=np.float32) @property def sd(self): return np.sqrt(np.var(np.abs(self.resp["a_acc"]))), np.sqrt(np.var(np.abs(self.resp["dis"]))) # Maximum absolute response acceleration [m/s2] @property def max(self): return np.max(np.abs(self.resp["a_acc"])), np.max(np.abs(self.resp["dis"])) (略) model = generate_model_policy(qubits, n_layers, n_actions, 1.0, observables) def gather_episodes(state_bounds, n_actions, model, n_episodes, env_name): """Interact with environment in batched fashion.""" trajectories = [defaultdict(list) for _ in range(n_episodes)] #envs = [gym.make(env_name) for _ in range(n_episodes)] envs = [Analysis() for _ in range(n_episodes)] done = [False for _ in range(n_episodes)] states = [e.reset() for e in envs] while not all(done): unfinished_ids = [i for i in range(n_episodes) if not done[i]] normalized_states = [s/state_bounds for i, s in enumerate(states) if not done[i]] for i, state in zip(unfinished_ids, normalized_states): trajectories[i]['states'].append(state) # Compute policy for all unfinished envs in parallel states = tf.convert_to_tensor(normalized_states) action_probs = model([states]) # Store action and transition all environments to the next state states = [None for i in range(n_episodes)] for i, policy in zip(unfinished_ids, action_probs.numpy()): action = np.random.choice(n_actions, p=policy) states[i], reward, done[i], _ = envs[i].step(action) trajectories[i]['actions'].append(action) trajectories[i]['rewards'].append(reward) return trajectories (略) # Instantiate vibration analysis as an environment env = Analysis()#追加 # Start training the agent episode_reward_history = [] for batch in range(n_episodes // batch_size): # Gather episodes state = env.reset()#追加 episodes = gather_episodes(state_bounds, n_actions, model, batch_size, Analysis()) # Group states, actions and returns in numpy arrays states = np.concatenate([ep['states'] for ep in episodes]) actions = np.concatenate([ep['actions'] for ep in episodes]) rewards, returns = env.step(action.item()) #rewards = [ep['rewards'] for ep in episodes] #returns = np.concatenate([compute_returns(ep_rwds, gamma) for ep_rwds in rewards]) #returns = np.array(returns, dtype=np.float32) id_action_pairs = np.array([[i, a] for i, a in enumerate(actions)]) # Update model parameters. reinforce_update(states, id_action_pairs, returns, model) # Store collected rewards for ep_rwds in rewards: episode_reward_history.append(np.sum(ep_rwds)) avg_rewards = np.mean(episode_reward_history[-10:]) print('Finished episode', (batch + 1) * batch_size, 'Average rewards: ', avg_rewards) if avg_rewards >= 500.0: break
試したこと
AnalysisクラスのstepとresetがBのコードに上手くつながるよう、
変数などの見直しをしました。
補足情報(FW/ツールのバージョンなど)
プログラムの開発環境は、Googlecolabotory です。

あなたの回答
tips
プレビュー