在庫最適化と安全在庫配置システム MESSA (MEta Safety Stock Allocation system)
サービスレベルと安全在庫係数
ここでは,小売店に顧客がやってくることによって需要が発生する場合を考える. 顧客は,毎日小売店にやってきて商品を購入するが,その量は気分によって変わるものとする. 一般には,商品が品切れしてしまうことは避けなければならない. 商品を余分に在庫しておく費用が,品切れしてしまう費用より小さい場合には, 商品を余分に在庫して気まぐれな顧客のために準備しておくことが常套手段である. このように,需要の不確実性に対処するためにもつ在庫を安全在庫(safety inventory, safety stock)とよぶ.
いま,簡単のため,毎日の需要量は,定常な分布にしたがっているものとする. ここで「定常」とは,日によって需要の分布が変化しないことを指す. 分布によっては,負の需要量になる可能性があるが,その場合には需要量を $0$ と定義する. たとえば, 需要が正規分布と仮定した場合には,負の部分を除いた正規分布(切断正規分布)を指すものとする.
data = np.random.normal(100,10,1000)
fig, best_dist, best_fit_name, best_fit_params = best_distribution(data)
plotly.offline.plot(fig);
print(best_fit_name, best_fit_params)
data = np.random.normal(100,10,1000)
fig, hist_dist = best_histogram(data, nbins = 20)
plotly.offline.plot(fig);
得られた分布に基いてランダムな需要を発生させる.
best_dist.mean(),hist_dist.mean()
得られた分布に対して,在庫管理に必要な統計量を計算する.
print( best_dist.ppf(0.9), hist_dist.ppf(0.9) ) #90%品切れしないための在庫量
print( best_dist.sf(200), hist_dist.sf(200) ) #在庫が200のときの品切れ率
需要分布に対して最適な適合をもつ分布を返す関数 fit_demand
引数:
- demand_df: 需要データフレーム
- cust_selected: 需要の分布を求めたい顧客名のリスト
- prod_selected: 需要の分布を求めたい製品名のリスト
- agg_period: 集約を行う期。例えば1週間を1期とするなら"1w"とする。既定値は1日("1d")
- nbins: ヒストグラムのビン数(既定値は50)
- L: リード時間(在庫管理で用いる.L日前からL-1前の需要の合計の分布をとる.)
返値:
dic: 顧客名と製品名のタプルをキーとし,分布の情報を値とした辞書; 分布の情報は以下のタプル
- fig: 誤差が最小の分布の密度関数と度数分布表の図
- fig2: 近似的な表形式の確率関数と度数分布表の図
- best_dist: 誤差最小の分布の密度関数
- hist_dist: 度風分布表のデータから構成された確率関数
- best_fit_name: 誤差最小の分布名
- best_fit_params: 誤差を最小にする分布のパラメータ
demand_df = pd.read_csv(folder+"demand_.csv")
#demand_df = pd.read_csv(folder+"demand_all.csv")
cust_selected = [ demand_df["cust"].iloc[11]]
prod_selected = [ demand_df["prod"].iloc[1] ]
#cust_selected = ["前橋市"]
#prod_selected = ["F"]
#print(cust_selected, prod_selected)
dic = fit_demand(demand_df, cust_selected, prod_selected, agg_period ="1d", nbins=50, L=3)
if dic[ cust_selected[0], prod_selected[0] ] is not None:
fig, fig2, best_dist, hist_dist, best_fit_name, best_fit_params = dic[ cust_selected[0], prod_selected[0] ]
print(best_fit_name, best_fit_params)
plotly.offline.plot(fig);
(Q,R)方策のシミュレーションを行う関数 simulate_inventory
基在庫レベル ($R=S$) を計算し、在庫シミュレーションをNumPyを用いて行う。 正味在庫レベル(実在庫と輸送中在庫の和)が基在庫レベル $S$ を下回ったら、固定数量 $Q$ だけ生産する。
引数:
- n_samples: シミュレーションを行うサンプル数
- n_periods: 期間数
- mu: 需要の平均
- sigma: 需要の標準偏差
- LT: リード時間
- LS: ロットサイズ ($=Q$)
- z: 安全在庫係数;安全在庫は基在庫レベル $S$ を求めるときに使われる。
- b: バックオーダー費用
- h: 在庫費用
返値:
- cost: 総費用の期待値
- I: 在庫の時間的な変化を表すNumPy配列で、形状は (n_samples, n_periods+1);在庫の可視化に用いる。
cost, I = simulate_inventory(hist_dist, n_samples =10, n_periods =100, z=2.33)
print("cost=",cost)
pd.DataFrame(I[0]).plot(); #在庫の推移の可視化
def base_stock_simulation_using_dist(n_samples, n_periods, demand, capacity, LT, b, h, S):
"""
単一段階在庫システムの定期発注方策に対するシミュレーションと微分値の計算
"""
I = np.zeros((n_samples, n_periods+1))
T = np.zeros((n_samples, n_periods+1))
I[:, 0] = S # initial inventory
production = np.zeros((n_samples, LT))
sum_dC = 0.
for t in range(n_periods):
I[:, t+1] = I[:, t] - demand[:, t] + \
production[:, (t-LT) % LT] # 在庫量の更新
prod = np.minimum(capacity, S+demand[:, t]-I[:, t]-T[:, t]) # 生産量の計算
#print(t,demand[:,t],I[:,t],ITI[:,t],prod,production[:, (t-LT) % LT])
T[:, t+1] = T[:, t] + prod - production[:, (t-LT) % LT] # 輸送中在庫量の更新
production[:, t % LT] = prod # 生産量の更新
dC = np.where(I[:, t] < 0, -b, h)
sum_dC += dC.sum()
total_cost = (-1*b*I[I < 0].sum() + h*I[I > 0].sum())/n_periods/n_samples
return sum_dC/n_samples/n_periods, total_cost, I
n_samples = 10
n_periods = 1000
capacity = 10.
convergence = 1e-5
b = 100
h = 1
LT = 1
critical_ratio = b/(b+h)
S = best_dist.ppf(critical_ratio) + 5
print(S, best_dist.ppf(0.01))
demand = best_dist.rvs((n_samples,n_periods)) + 5.
print("t: S dS Cost")
for iter_ in range(100):
dC, cost, I = base_stock_simulation_using_dist(n_samples, n_periods, demand, capacity, LT, b, h, S)
S = S - 0.1*dC
print(f"{iter_}: {S:.2f} {dC:.3f} {cost:.2f}")
if dC**2<=convergence:
break
n_samples = 100
n_periods = 10000
mu = 100
sigma = 10
LT = 3
z = 2.43
b, h = 100, 1
capacity = 105
S = mu*LT + z*sigma*np.sqrt(LT)
convergence = 1e-5
print("Initial S=", S)
demand = np.random.normal(mu, sigma, (n_samples, n_periods))
print("t: S dS Cost")
for iter_ in range(100):
dC, cost, I = base_stock_simulation(n_samples, n_periods, demand, capacity, LT, b, h, S)
S = S - 10*dC
print(f"{iter_}: {S:.2f} {dC:.3f} {cost:.2f}")
if dC**2<=convergence:
break
np.random.seed(123)
n_samples = 10
n_stages = 3
n_periods = 1000
mu = 100
sigma = 10
LT = np.array([1, 2, 3])
# 初期エシェロン在庫量は、最終需要地点以外には、定期発注方策の1日分の時間を加えておく。
ELT = LT.cumsum()
ELT = ELT + 1
ELT[0] = ELT[0]-1
print("Echelon LT=", ELT)
z = 1.33
b = 100
h = np.array([10., 5., 2.])
S = mu*(ELT) + z*sigma*np.sqrt(ELT)
print("S=", S)
capacity = np.array([120., 120., 130.])
demand=np.random.normal(mu, sigma, (n_samples, n_periods))
print("t: Cost |dC| S ")
epsilon=0.01
for iter_ in range(100):
dC, cost, I=multi_stage_base_stock_simulation(n_samples, n_periods, demand, capacity, LT, b, h, S)
sum_=0.
for j in range(n_stages):
sum_ += dC[j]**2
S[j]=S[j] - 1.*dC[j]
print(f"{iter_}: {cost:.2f}, {sum_:.3f}", S)
if sum_ <= epsilon:
break
from plotly.subplots import make_subplots
fig = make_subplots(rows=3, cols=1, subplot_titles= ["Stage " + str(i) for i in range(3)] )
for j in range(3):
trace = go.Scatter(
x= list(range(n_periods)),
y= I[0,j,:],
mode = 'markers + lines',
marker= dict(size= 5,
line= dict(width=1),
opacity= 0.3
),
name ="Stage "+str(j)
)
fig.add_trace(
trace,
row=j+1, col=1
)
fig.update_layout(height=1000,title_text=f"Inventories for all stages", showlegend=False)
plotly.offline.plot(fig)
ネットワーク型定期発注方策に対するシミュレーションと微分値の計算 network_base_stock_simulation
一般のネットワークに対して実装したものが、以下のnetwork_base_stock_simulationである。
引数:
- G: 有向グラフ(SCMGraphのインスタンス)
- n_samples: シミュレーションを行うサンプル数
- n_periods: 期間数
- demand: 需要地点ごとの需要量を保持した辞書。キーは顧客の番号。値は、需要のサンプルを表すNumPyの配列で、形状は (n_samples, n_periods)。
- capacity: 容量(生産量上限)を表すNumPy配列
- LT: 地点毎のリード時間を表すNumPy配列
- ELT: 地点毎のエシェロンリード時間を表すNumPy配列
- b: バックオーダー費用
- h: 在庫費用
- S: 基在庫レベル
- phi: 部品展開表における親製品を製造するために必要な子製品の量
- alpha: 上流の在庫地点が下流の在庫地点に在庫を配分する率
返値:
- dC: 各地点における総費用の基準在庫レベルによる微分の期待値
- total_cost: 総費用の期待値
- I: 在庫の時間的な変化を表すNumPy配列で、形状はグラフの点の数をn_stagesとしたとき (n_samples, n_stages, n_periods+1)。
G = SCMGraph()
z = 1.65
b = np.array([0.,100.,100.])
h = np.array([1., 5., 2.])
mu = np.array([300., 200., 100.])
sigma = np.array([12., 10., 15.])
capacity = np.array([1320., 1120., 1230.])
G.add_edges_from([(0, 1), (0, 2)])
LT = np.array([3, 1, 2])
ELT = np.zeros(len(G))
phi = {(0, 1): 1, (0, 2): 1}
alpha = {(0, 1): 0.5, (0, 2): 0.5}
n_samples = 100
n_periods = 1000
demand ={} #需要量
for i in G:
if G.out_degree(i) == 0:
demand[i] = np.random.normal(mu[i], sigma[i], (n_samples, n_periods))
# エシェロンリード時間と初期基在庫レベルの設定
ELT, S = initial_base_stock_level(G, LT, mu, z, sigma)
print("ELT=", ELT)
print("S=", S)
print("t: Cost |dC| S ")
epsilon = 0.01
for iter_ in range(100):
dC, cost, I = network_base_stock_simulation(G, n_samples, n_periods, demand, capacity, LT, ELT, b, h, S, phi, alpha)
sum_ = 0.
#print("dC=",dC)
for j in G:
sum_ += dC[j]**2
S[j] = S[j] - 1.*dC[j]
print(f"{iter_}: {cost:.2f}, {sum_:.5f}", S)
if sum_ <= epsilon:
break
from plotly.subplots import make_subplots
fig = make_subplots(rows=3, cols=1, subplot_titles= ["Stage " + str(i) for i in range(3)] )
for j in range(3):
trace = go.Scatter(
x= list(range(n_periods)),
y= I[0,j,:],
mode = 'markers + lines',
marker= dict(size= 5,
line= dict(width=1),
opacity= 0.3
),
name ="Stage "+str(j)
)
fig.add_trace(
trace,
row=j+1, col=1
)
fig.update_layout(height=1000,title_text=f"Inventories for all stages", showlegend=False)
plotly.offline.plot(fig);