Solución:
Con una versión más reciente de Pandas, hay una resample
método muy rápido y útil para realizar la misma tarea:
ohlc_dict = {
'Open':'first',
'High':'max',
'Low':'min',
'Close': 'last',
'Volume': 'sum'
}
df.resample('5T', how=ohlc_dict, closed='left', label="left")
Su enfoque es sólido, pero falla porque cada función en el dict-of-functions aplicado a agg () recibe un objeto Series que refleja la columna que coincide con el valor de la clave. Por lo tanto, no es necesario volver a filtrar por etiqueta de columna. Con esto, y asumiendo que groupby conserva el orden, puede dividir la Serie para extraer el primer / último elemento de las columnas Abrir / Cerrar (nota: la documentación de groupby no pretende preservar el orden de las series de datos originales, pero parece que lo hace en la práctica).
In [50]: df.groupby(dr5minute.asof).agg({'Low': lambda s: s.min(),
'High': lambda s: s.max(),
'Open': lambda s: s[0],
'Close': lambda s: s[-1],
'Volume': lambda s: s.sum()})
Out[50]:
Close High Low Open Volume
key_0
1999-01-04 10:20:00 1.1806 1.1819 1.1801 1.1801 34
1999-01-04 10:25:00 1.1789 1.1815 1.1776 1.1807 91
1999-01-04 10:30:00 1.1791 1.1792 1.1776 1.1780 16
Como referencia, aquí hay una tabla para resumir los tipos de entrada y salida esperados de una función de agregación según el tipo de objeto groupby y cómo se pasan las funciones de agregación a agg ().
agg() method agg func agg func agg()
input type accepts returns result
GroupBy Object
SeriesGroupBy function Series value Series
dict-of-funcs Series value DataFrame, columns match dict keys
list-of-funcs Series value DataFrame, columns match func names
DataFrameGroupBy function DataFrame Series/dict/ary DataFrame, columns match original DataFrame
dict-of-funcs Series value DataFrame, columns match dict keys, where dict keys must be columns in original DataFrame
list-of-funcs Series value DataFrame, MultiIndex columns (original cols x func names)
De la tabla anterior, si la agregación requiere acceso a más de una columna, la única opción es pasar una sola función a un objeto DataFrameGroupBy. Por lo tanto, una forma alternativa de realizar la tarea original es definir una función como la siguiente:
def ohlcsum(df):
df = df.sort()
return {
'Open': df['Open'][0],
'High': df['High'].max(),
'Low': df['Low'].min(),
'Close': df['Close'][-1],
'Volume': df['Volume'].sum()
}
y aplique agg () con él:
In [30]: df.groupby(dr5minute.asof).agg(ohlcsum)
Out[30]:
Open High Low Close Volume
key_0
1999-01-04 10:20:00 1.1801 1.1819 1.1801 1.1806 34
1999-01-04 10:25:00 1.1807 1.1815 1.1776 1.1789 91
1999-01-04 10:30:00 1.1780 1.1792 1.1776 1.1791 16
Aunque los pandas pueden ofrecer algo de magia incorporada más limpia en el futuro, es de esperar que esto explique cómo trabajar con las capacidades actuales de agg ().
Dentro de mi principal() función Estoy recibiendo transmisión de datos de oferta / demanda. Luego hago lo siguiente:
df = pd.DataFrame([])
for msg_type, msg in response.parts():
if msg_type == "pricing.Price":
sd = StreamingData(datetime.now(),instrument_string(msg),
mid_string(msg),account_api,account_id,
's','5min',balance)
df = df.append(sd.df())
sd.resample(df)
Creé una clase StreamingData () que toma la entrada proporcionada (también creó algunas funciones para dividir los datos de oferta / demanda en componentes individuales (oferta, demanda, medio, instrumento, etc.).
La belleza de esto es todo lo que tienes que hacer es cambiar el ‘s’ y ‘5 minutos’ a los plazos que desee. Configúrelo en ‘m’ y ‘D’ para obtener los precios diarios por minuto.
Esto es lo que mi StreamingData () parece:
class StreamingData(object):
def __init__(self, time, instrument, mid, api, _id, xsec, xmin, balance):
self.time = time
self.instrument = instrument
self.mid = mid
self.api = api
self._id = _id
self.xsec = xsec
self.xmin = xmin
self.balance = balance
self.data = self.resample(self.df())
def df(self):
df1 = pd.DataFrame({'Time':[self.time]})
df2 = pd.DataFrame({'Mid':[float(self.mid)]})
df3 = pd.concat([df1,df2],axis=1,join='inner')
df = df3.set_index(['Time'])
df.index = pd.to_datetime(df.index,unit="s")
return df
def resample(self, df):
xx = df.to_period(freq=self.xsec)
openCol = xx.resample(self.xmin).first()
highCol = xx.resample(self.xmin).max()
lowCol = xx.resample(self.xmin).min()
closeCol = xx.resample(self.xmin).last()
self.data = pd.concat([openCol,highCol,lowCol,closeCol],
axis=1,join='inner')
self.data['Open'] = openCol.round(5)
self.data['High'] = highCol.round(5)
self.data['Low'] = lowCol.round(5)
self.data['Close'] = closeCol.round(5)
return self.data
Así que toma los datos de StreamingData (), crea un marco de datos indexado en el tiempo dentro df (), lo agrega, luego lo envía a remuestrear (). Los precios que calculo se basan en: mid = (oferta + demanda) / 2