from gurobipy import Model, quicksum, GRB
#from mypulp import Model, quicksum, GRB
from collections import OrderedDict, defaultdict
import networkx as nx
乗務員スケジューリング問題
開始時刻と終了時刻が決まっている $N$ 個のタスクを、$K$ 人の等質な乗務員で処理することを考える。 乗務員の稼働時間(タスクを処理する時間とタスク間の移動時間の合計)の上限制約がある下で、タスク間の移動費用の合計を最小化するスケジュールを求めよ。
もとになった論文やデータは以下のサイト(OR Library)を参照されたい。
http://people.brunel.ac.uk/ mastjjb/jeb/orlib/cspinfo.html
切除平面法による定式化
- $N$: タスクの数
- $K$: 乗務員の数
- $s_i, f_i$ : タスク $i$ の開始時刻と終了時刻
- $c_{ij}$: タスク $i$ の直後にタスク $j$ を処理したときの移動時間(費用)
- $x_{ij}$: タスク $i$ の直後にタスク $j$ を処理しとき $1$、それ以外のとき $0$
タスクを点、タスク間の移動を枝としたネットワークを作り、ダミーの始点 $0$ とダミーの終点 $N+1$ を追加しておく。
$$ \begin{array}{lll} minimize & \sum_{i,j} c_{ij} x_{ij} & \\ s.t. & \sum_{j} x_{ji} = \sum_{j} x_{ij} & \forall i=1,2,\ldots,N \\ & \sum_{j} x_{ij} = 1 & \forall i=1,2,\ldots,N \\ & \sum_{j} x_{0j} = K & \\ \end{array} $$最初の制約は、乗務員の移動のフロー整合条件である。2番目の制約は、各タスクが必ず何れかの乗務員によって処理されなければならないことを表す。3番目の制約は、始点から出る乗務員の数がちょうど $K$ 人であることを表す。
上の定式化には、乗務員の移動時間の上限制約が含まれていない。この制約は、切除平面として追加する。
まず,データを読み込み,データ構造を準備する.ネットワークはnetworkXの有向グラフで表現し,移動費用 weight と移動時間にタスクの終了時刻と開始時刻の差を加えた時間 time を枝上に保管しておく. これ(time)は,パス型の定式化で用いる.
fname = "../data/csp/csp50.txt"
f = open(fname, "r")
lines = f.readlines()
n_tasks, time_limit = map(int, lines[0].split())
print("Number of tasks", n_tasks, "Time limit=", time_limit)
K = 27 #number of crews
start, finish = OrderedDict(), OrderedDict()
for t in range(1, n_tasks + 1):
start[t], finish[t] = map(int, lines[t].split())
D = nx.DiGraph()
#タスク間の移動費用の読み込み
for line in lines[1 + n_tasks :]:
tail, head, cost = map(int, line.split())
D.add_edge(tail, head, weight=cost, time=start[head]-start[tail])
#ダミーの始点からタスクへの枝
for t in range(1, n_tasks + 1):
D.add_edge(0, t, weight=0, time=0)
#タスクからダミーの終点への枝
for t in range(1, n_tasks + 1):
D.add_edge(t, n_tasks + 1, weight=0, time=finish[t]-start[t])
切除平面用のコールバック関数を準備する.
def csp_callback(model, where):
if where != GRB.Callback.MIPSOL:
return
SolGraph = nx.DiGraph()
for (i, j) in x:
if model.cbGetSolution(x[i, j]) > 0.1:
SolGraph.add_edge(i, j)
for path in nx.all_simple_paths(SolGraph, source=0, target=n_tasks + 1):
i = path[1]
t_sum = finish[i] - start[i]
for j in path[2:-1]:
t_sum += finish[j] - start[j]
t_sum += max(start[j] - finish[i], 0)
i = j
if t_sum > time_limit:
edges = []
i = path[0]
for j in path[1:]:
edges.append((i, j))
i = j
model.cbLazy(quicksum(x[i, j] for (i, j) in edges) <= len(edges) - 1)
return
model = Model()
x = {}
for (i, j) in D.edges():
x[i, j] = model.addVar(vtype="B", name=f"x[{i},{j}]")
model.update()
for i in range(1, n_tasks + 1):
model.addConstr(
quicksum(x[i, j] for j in D.successors(i)) == 1, name=f"task_execution[{i}]"
)
for i in range(1, n_tasks + 1):
model.addConstr(
quicksum(x[j, i] for j in D.predecessors(i))
== quicksum(x[i, j] for j in D.successors(i)),
name=f"flow_conservation[{i}]",
)
model.addConstr(quicksum(x[0, j] for j in D.successors(0)) == K)
model.setObjective(quicksum(D[i][j]["weight"] * x[i, j] for (i, j) in x), GRB.MINIMIZE)
model.Params.DualReductions = 0
model.params.LazyConstraints = 1
model.optimize(csp_callback)
%matplotlib inline
SolGraph = nx.DiGraph()
for (i, j) in x:
if x[i, j].X > 0.1:
SolGraph.add_edge(i, j)
SolGraph.remove_node(0)
SolGraph.remove_node(n_tasks + 1)
nx.draw(SolGraph, pos=nx.spring_layout(SolGraph))
def k_th_sp(G, source, sink, k, weight="weight", time_limit=time_limit):
"""
Find k-th shortest paths and returns path and its cost
"""
time_list, path_list = [], []
for i, p in enumerate(nx.shortest_simple_paths(G, source, sink, weight=weight)):
if i >= k:
break
v = p[0]
time = 0.0
for w in p[1:]:
time += G[v][w][weight]
v = w
if time > time_limit:
break
time_list.append(time)
path_list.append(tuple(p))
return time_list, path_list
次に、タスクを含むパスの集合を辞書 Paths に保管し、パスの費用 C を計算しておく。
time_list, path_list = k_th_sp(D, 0, n_tasks + 1, 100000, weight="time")
C = {}
Paths = defaultdict(set)
for p, path in enumerate(path_list):
cost_ = 0
for j in range(1, len(path) - 1):
cost_ += D[path[j]][path[j + 1]]["weight"]
Paths[path[j]].add(p)
C[p] = cost_
定式化に必要なパラメータと変数を定義しておく。
- $P$ : パスの集合
- $C_p$: パス $p$ の費用
- $Paths_i$: タスク $i$ を処理するパスの集合
- $X_p$: パス $p$ を選択したとき $1$、それ以外のとき $0$
パス型定式化は,以下のようになる.
$$ \begin{array}{lll} minimize & \sum_{p} C_{p} X_{p} & \\ s.t. & \sum_{p \in Paths_i} X_p \geq 1 & \forall i=1,2,\ldots,N \\ & \sum_{p} X_{p} = K & \\ \end{array} $$上の定式化を用いて求解する.
model = Model()
X = {}
for p in C:
X[p] = model.addVar(vtype="B", name=f"x({p})")
model.update()
for t in range(1, n_tasks + 1):
model.addConstr(quicksum(X[p] for p in Paths[t]) >= 1)
model.addConstr(quicksum(X[p] for p in X) == K)
model.setObjective(quicksum(C[p] * X[p] for p in X), GRB.MINIMIZE)
model.optimize()
print(model.ObjVal)
SolGraph = nx.DiGraph()
for p in X:
if X[p].X > 0.01:
for j in range(1, len(path_list[p]) - 2):
SolGraph.add_edge(path_list[p][j], path_list[p][j + 1])
nx.draw(SolGraph, pos=nx.spring_layout(SolGraph))
同等の解が,より高速に得られた.
航空機産業における乗務員スケジューリング問題
航空機産業は,乗務員スケジューリング問題が最も有効に活用されている分野である.
航空機産業において,処理すべきタスクの集合は,以下の4つの階層に分けると考えやすい.
便および回送:便(flight, flight segment)とは,航空機がある空港を決められた時刻に出発し,途中で着陸することなしに飛行し,別の空港に決められた時刻に到着することを表す.
回送(deadhead)とは,乗務員(パイロット,パーサーなど)が便に乗客として乗るか,他の移動手段で空港間を移動することを指す.
任務: 任務(duty)とは,1日の乗務員のスケジュールを表し,1つ以上の便および回送とその間の休息時間から構成される.
ペアリング: ペアリング(paring, rotation)とは,ある出発地点(home base)から出発し, 再び同じ出発地点に戻る乗務員のスケジュールを表し,1つ以上の任務とその間の休息期間から構成される.おおむね1週間の乗務員のスケジュールを表す.
個別月間ブロック:個別月間ブロック(personalized monthly block)とは,月間の乗務員のスケジュールを表し,1 つ以上のペアリングとその間の休息期間,長期休暇,待機期間などから構成される. 勤務表(rostering)ともよばれる.
以下では,ペアリングと個別月間ブロックを生成する最適化モデルを紹介する.
乗務員ペアリング問題
乗務員ペアリング問題(crew paring problem)は,各便をすべてカバーするように乗務員のペアリング(1週間程度のスケジュール)を作成する. これには,列生成法が用いられる.任務を点とし,点に利益(双対変数)が与えられたネットワーク上のパスを求めることによって列が生成される. パスは,業務上の様々な制約を考慮する必要があるため,航空機会社ごとに設計される.
集合:
- $P$: 実行可能なペアリングの集合
- $F$: 便の集合
- $F_p$: ペアリング $p \in P$ に含まれる便の集合
パラメータ:
- $c_p$: ペアリング $p \in P$ に対する費用.通常は,ペアリングに含まれる便の費用の合計,TAFB(time away from base)に時給を乗じたもの,最低賃金の最大値に待ち時間を加えて計算する.
変数:
- $x_p$: ペアリング $p \in P$ を採用するとき $1$,それ以外のとき $0$
上の記号を用いると,乗務員ペアリング問題は以下のように定式化できる.
$$ \begin{array}{cll} minimize & \sum_{p \in P} c_p x_p & \\ s.t. & \sum_{p: f \in F_p} x_p \geq 1 & \forall f \in F \\ & x_p \in \{ 0,1 \} & \forall p \in P \end{array} $$ペアリングの総数は膨大になるので,実際の求解には,列生成法が用いられる.
乗務員割当問題
個別月間ブロック割当て問題(monthly block assignment problem)は乗務員割当問題(crew assignment problem)ともよばれ, 乗務員ペアリング問題で決められたペアリングを,個々の乗務員に割り振る問題である.
この問題は,航空機会社によって異なる方式で解決される.
入札問題(bidline problem):乗務員の個人の情報を用いないで,すべてのペアリングをカバーする個別月間ブロックを作成する. その後で,得られた個別月間ブロックを乗務員ごとに決められた優先順位の順(通常は年齢順)に選択していく.この方式は,主に北米の航空機会社で採用されている. 我が国のトラック業界では,しばしばカルタ取り方式とよばれている.
乗務員勤務表問題 (crew rostering problem):乗務員の個々の要求(desiderata)に基づき個別月間ブロックを作成する.この方法は,ヨーロッパにおいて主流である.
以下では,典型的な乗務員割当問題に対する定式化を示す.
集合:
- $F$: 便の集合
- $P$: 実行可能なペアリングの集合.上の乗務員ペアリング問題で求めてあると仮定する
- $L$: 乗務員(パイロットもしくはキャビンクルー)の集合
- $S_{\ell}$: 乗務員 $\ell \in L$ に対する実行可能な個別スケジュールの集合
- $V_{\ell}$: 乗務員 $\ell \in L$ が選好する休暇の集合
- $B_{\ell}$: 乗務員 $\ell \in L$ が選好する便の集合
パラメータ:
- $C_s^{\ell}$: 乗務員 $\ell \in L$ がスケジュール $s$ を遂行したときの費用.選好する便や休暇を考慮して決定する
- $n_p$: ペアリング $p$ に必要な乗務員の数
- $u$: 選好する便数の下限
- $w$: 選好する休暇数の下限
- $e_{p}^{s, \ell}$: ペアリング $p$ が乗務員 $\ell$ のスケジュール $s$ に含まれるとき $1$,それ以外のとき $0$ の定数
- $\eta_f^p$: 便 $f$ がペアリング $p$ に含まれるとき $1$,それ以外のとき $0$ の定数
- $\zeta_{v}^{s, \ell}$: 休暇 $v$ が乗務員 $\ell$ のスケジュール $s$ に含まれるとき $1$,それ以外のとき $0$ の定数
変数:
- $x_s^{\ell}$: 乗務員 $\ell \in L$ がスケジュール $s \in S_{\ell}$ を遂行するとき $1$,それ以外のとき $0$
上の記号を用いると,乗務員勤務表問題は以下のように定式化できる.
$$ \begin{array}{cll} minimize & \sum_{\ell \in L} \sum_{s \in S_{\ell}} C_s^{\ell} x_s^{\ell} & \\ s.t. & \sum_{\ell \in L} \sum_{s \in S_{\ell}} e_{p}^{s, \ell} x_s^{\ell} \geq n_p & \forall p \in P \\ & \sum_{s \in S_{\ell}} x_s^{\ell} \leq 1 & \forall \ell \in L \\ & \sum_{\ell \in L} \sum_{f \in B_{\ell}} \sum_{s \in S_{\ell}} \sum_{p \in P} \eta_f^p e_{p}^{s, \ell} x_s^{\ell} \geq u & \\ & \sum_{\ell \in L} \sum_{v \in V_{\ell}} \sum_{s \in S_{\ell}} \zeta_{v}^{s, \ell} x_s^{\ell} \geq w & \\ & x_s^{\ell} \in \{ 0,1 \} & \forall \ell \in L, s \in S_{\ell} \end{array} $$可能なスケジュールの総数は膨大になるので,実際の求解には,列生成法を用いる.